This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 61f8307 array type support. (#75)
61f8307 is described below
commit 61f83074d229a786f8d90e94cd525d4cae384591
Author: benjobs <[email protected]>
AuthorDate: Tue Mar 14 09:26:42 2023 +0800
array type support. (#75)
* scala version bug fixed,array type convert bug fuxed
---------
Co-authored-by: benjobs <[email protected]>
---
spark-doris-connector/pom.xml | 1 -
.../apache/doris/spark/serialization/RowBatch.java | 16 +++++++-
.../org/apache/doris/spark/sql/SchemaUtils.scala | 43 +++++++++++-----------
3 files changed, 36 insertions(+), 24 deletions(-)
diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml
index ecfa032..e89813d 100644
--- a/spark-doris-connector/pom.xml
+++ b/spark-doris-connector/pom.xml
@@ -64,7 +64,6 @@
<properties>
<spark.version>3.1.2</spark.version>
<spark.minor.version>3.1</spark.minor.version>
- <scala.version>2.12.8</scala.version>
<scala.version>2.12</scala.version>
<libthrift.version>0.13.0</libthrift.version>
<arrow.version>5.0.0</arrow.version>
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
index 72c7ac9..e666de5 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
@@ -38,6 +38,7 @@ import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.Types;
import org.apache.doris.spark.exception.DorisException;
@@ -87,7 +88,7 @@ public class RowBatch {
this.arrowStreamReader = new ArrowStreamReader(
new ByteArrayInputStream(nextResult.getRows()),
rootAllocator
- );
+ );
try {
VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();
while (arrowStreamReader.loadNextBatch()) {
@@ -275,6 +276,19 @@ public class RowBatch {
addValueToRow(rowIndex, value);
}
break;
+ case "ARRAY":
+
Preconditions.checkArgument(mt.equals(Types.MinorType.LIST),
+ typeMismatchMessage(currentType, mt));
+ ListVector listVector = (ListVector) curFieldVector;
+ for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
+ if (listVector.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
+ continue;
+ }
+ String value =
listVector.getObject(rowIndex).toString();
+ addValueToRow(rowIndex, value);
+ }
+ break;
default:
String errMsg = "Unsupported type " +
schema.get(col).getType();
logger.error(errMsg);
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
index 15ad7a1..2565aff 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
@@ -17,8 +17,7 @@
package org.apache.doris.spark.sql
-import scala.collection.JavaConverters._
-
+import scala.collection.JavaConversions._
import org.apache.doris.spark.cfg.Settings
import org.apache.doris.spark.exception.DorisException
import org.apache.doris.spark.rest.RestService
@@ -26,16 +25,14 @@ import org.apache.doris.spark.rest.models.{Field, Schema}
import org.apache.doris.thrift.TScanColumnDesc
import org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD
import org.apache.spark.sql.types._
-
import org.slf4j.LoggerFactory
-import scala.collection.mutable
-
private[spark] object SchemaUtils {
private val logger =
LoggerFactory.getLogger(SchemaUtils.getClass.getSimpleName.stripSuffix("$"))
/**
* discover Doris table schema from Doris FE.
+ *
* @param cfg configuration
* @return Spark Catalyst StructType
*/
@@ -46,6 +43,7 @@ private[spark] object SchemaUtils {
/**
* discover Doris table schema from Doris FE.
+ *
* @param cfg configuration
* @return inner schema struct
*/
@@ -55,35 +53,34 @@ private[spark] object SchemaUtils {
/**
* convert inner schema struct to Spark Catalyst StructType
+ *
* @param schema inner schema
* @return Spark Catalyst StructType
*/
def convertToStruct(dorisReadFields: String, schema: Schema): StructType = {
- var fieldList = new Array[String](schema.size())
- val fieldSet = new mutable.HashSet[String]()
- var fields = List[StructField]()
- if (dorisReadFields != null && dorisReadFields.length > 0) {
- fieldList = dorisReadFields.split(",")
- for (field <- fieldList) {
- fieldSet.add(field)
- }
- schema.getProperties.asScala.foreach(f =>
- if (fieldSet.contains(f.getName)) {
- fields :+= DataTypes.createStructField(f.getName,
getCatalystType(f.getType, f.getPrecision, f.getScale), true)
- })
+ val fieldList = if (dorisReadFields != null && dorisReadFields.length > 0)
{
+ dorisReadFields.split(",")
} else {
- schema.getProperties.asScala.foreach(f =>
- fields :+= DataTypes.createStructField(f.getName,
getCatalystType(f.getType, f.getPrecision, f.getScale), true)
- )
+ Array.empty[String]
}
- DataTypes.createStructType(fields.asJava)
+ val fields = schema.getProperties
+ .filter(x => fieldList.contains(x.getName) || fieldList.isEmpty)
+ .map(f =>
+ DataTypes.createStructField(
+ f.getName,
+ getCatalystType(f.getType, f.getPrecision, f.getScale),
+ true
+ )
+ )
+ DataTypes.createStructType(fields)
}
/**
* translate Doris Type to Spark Catalyst type
+ *
* @param dorisType Doris type
* @param precision decimal precision
- * @param scale decimal scale
+ * @param scale decimal scale
* @return Spark Catalyst type
*/
def getCatalystType(dorisType: String, precision: Int, scale: Int): DataType
= {
@@ -112,6 +109,7 @@ private[spark] object SchemaUtils {
case "DECIMAL128I" => DecimalType(precision, scale)
case "TIME" => DataTypes.DoubleType
case "STRING" => DataTypes.StringType
+ case "ARRAY" => DataTypes.StringType
case "HLL" =>
throw new DorisException("Unsupported type " + dorisType)
case _ =>
@@ -121,6 +119,7 @@ private[spark] object SchemaUtils {
/**
* convert Doris return schema to inner schema struct.
+ *
* @param tscanColumnDescs Doris BE return schema
* @return inner schema struct
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]