This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b761f3  [Feature](Connector) Use Array[String] instead of defaulting 
to String (#362)
4b761f3 is described below

commit 4b761f379d58cbba32a3c0735ae5868d0862dc00
Author: Adesh Nalpet Adimurthy <[email protected]>
AuthorDate: Wed Jun 3 03:01:41 2026 -0400

    [Feature](Connector) Use Array[String] instead of defaulting to String 
(#362)
---
 .../spark/client/read/AbstractThriftReader.java    |   7 +-
 .../spark/client/read/DorisFlightSqlReader.java    |   4 +-
 .../apache/doris/spark/client/read/RowBatch.java   |  24 ++++-
 .../apache/doris/spark/config/DorisOptions.java    |   2 +
 .../doris/spark/sql/DorisRowFlightSqlReader.scala  |   2 +
 .../doris/spark/sql/DorisRowThriftReader.scala     |   2 +
 .../doris/spark/sql/sources/DorisRelation.scala    |   3 +-
 .../apache/doris/spark/util/RowConvertors.scala    |  12 ++-
 .../apache/doris/spark/util/SchemaConvertors.scala |   6 +-
 .../doris/spark/util/RowConvertorsTest.scala       |  12 +++
 .../doris/spark/util/SchemaConvertorsTest.scala    |   8 +-
 .../apache/doris/spark/sql/DorisReaderITCase.scala | 104 +++++++++++++++++++++
 .../doris/spark/catalog/DorisTableBase.scala       |   8 +-
 13 files changed, 179 insertions(+), 15 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
index 76ea2a9..3c37e5c 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
@@ -76,6 +76,8 @@ public abstract class AbstractThriftReader extends 
DorisReader {
 
     private final Boolean datetimeJava8ApiEnabled;
 
+    private final boolean arrayNativeType;
+
     protected AbstractThriftReader(DorisReaderPartition partition) throws 
Exception {
         super(partition);
         this.frontend = new DorisFrontendClient(config);
@@ -112,6 +114,7 @@ public abstract class AbstractThriftReader extends 
DorisReader {
             this.asyncThread = null;
         }
         this.datetimeJava8ApiEnabled = partition.getDateTimeJava8APIEnabled();
+        this.arrayNativeType = 
config.getValue(DorisOptions.DORIS_READ_ARRAY_NATIVE_TYPE);
     }
 
     private void runAsync() throws DorisException, InterruptedException {
@@ -128,7 +131,7 @@ public abstract class AbstractThriftReader extends 
DorisReader {
             });
             endOfStream.set(nextResult.isEos());
             if (!endOfStream.get()) {
-                rowBatch = new RowBatch(nextResult, dorisSchema, 
datetimeJava8ApiEnabled);
+                rowBatch = new RowBatch(nextResult, dorisSchema, 
datetimeJava8ApiEnabled, arrayNativeType);
                 offset += rowBatch.getReadRowCount();
                 rowBatch.close();
                 rowBatchQueue.put(rowBatch);
@@ -187,7 +190,7 @@ public abstract class AbstractThriftReader extends 
DorisReader {
                 });
                 endOfStream.set(nextResult.isEos());
                 if (!endOfStream.get()) {
-                    rowBatch = new RowBatch(nextResult, dorisSchema, 
datetimeJava8ApiEnabled);
+                    rowBatch = new RowBatch(nextResult, dorisSchema, 
datetimeJava8ApiEnabled, arrayNativeType);
                 } else {
                     logger.info(
                             "Scan finished, tablets: {}, offset: {}",
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
index b691f83..ef35554 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
@@ -63,6 +63,7 @@ public class DorisFlightSqlReader extends DorisReader {
     private AdbcConnection connection;
     private final ArrowReader arrowReader;
     private final Boolean datetimeJava8ApiEnabled;
+    private final boolean arrayNativeType;
     private int totalBatches = 0;
     private long totalRows = 0;
 
@@ -89,6 +90,7 @@ public class DorisFlightSqlReader extends DorisReader {
         this.schema = processDorisSchema(partition);
         this.arrowReader = executeQuery();
         this.datetimeJava8ApiEnabled = partition.getDateTimeJava8APIEnabled();
+        this.arrayNativeType = 
config.getValue(DorisOptions.DORIS_READ_ARRAY_NATIVE_TYPE);
     }
 
     @Override
@@ -111,7 +113,7 @@ public class DorisFlightSqlReader extends DorisReader {
                 throw new DorisException(e);
             }
             if (!endOfStream.get()) {
-                rowBatch = new RowBatch(arrowReader, schema, 
datetimeJava8ApiEnabled);
+                rowBatch = new RowBatch(arrowReader, schema, 
datetimeJava8ApiEnabled, arrayNativeType);
             }
         }
         return !endOfStream.get();
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
index d937edd..fe5c59f 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
@@ -106,12 +106,19 @@ public class RowBatch implements Serializable {
 
     private final Boolean datetimeJava8ApiEnabled;
 
+    private final boolean arrayNativeType;
+
     public RowBatch(TScanBatchResult nextResult, Schema schema, Boolean 
datetimeJava8ApiEnabled) throws DorisException {
+        this(nextResult, schema, datetimeJava8ApiEnabled, false);
+    }
+
+    public RowBatch(TScanBatchResult nextResult, Schema schema, Boolean 
datetimeJava8ApiEnabled, boolean arrayNativeType) throws DorisException {
 
         this.rootAllocator = new RootAllocator(Integer.MAX_VALUE);
         this.arrowReader = new ArrowStreamReader(new 
ByteArrayInputStream(nextResult.getRows()), rootAllocator);
         this.schema = schema;
         this.datetimeJava8ApiEnabled = datetimeJava8ApiEnabled;
+        this.arrayNativeType = arrayNativeType;
 
         try {
             VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
@@ -128,10 +135,15 @@ public class RowBatch implements Serializable {
     }
 
     public RowBatch(ArrowReader reader, Schema schema, Boolean 
datetimeJava8ApiEnabled) throws DorisException {
+        this(reader, schema, datetimeJava8ApiEnabled, false);
+    }
+
+    public RowBatch(ArrowReader reader, Schema schema, Boolean 
datetimeJava8ApiEnabled, boolean arrayNativeType) throws DorisException {
 
         this.arrowReader = reader;
         this.schema = schema;
         this.datetimeJava8ApiEnabled = datetimeJava8ApiEnabled;
+        this.arrayNativeType = arrayNativeType;
 
         try {
             VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
@@ -483,8 +495,16 @@ public class RowBatch implements Serializable {
                                 addValueToRow(rowIndex, null);
                                 continue;
                             }
-                            String value = 
listVector.getObject(rowIndex).toString();
-                            addValueToRow(rowIndex, value);
+                            List<?> rawList = listVector.getObject(rowIndex);
+                            if (arrayNativeType) {
+                                List<String> stringList = new 
ArrayList<>(rawList.size());
+                                for (Object item : rawList) {
+                                    stringList.add(item == null ? null : 
item.toString());
+                                }
+                                addValueToRow(rowIndex, stringList);
+                            } else {
+                                addValueToRow(rowIndex, rawList.toString());
+                            }
                         }
                         break;
                     case "MAP":
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
index 940f06b..1d77d63 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
@@ -142,6 +142,8 @@ public class DorisOptions {
 
     public static final ConfigOption<Boolean> DORIS_READ_BITMAP_TO_BASE64 = 
ConfigOptions.name("doris.read.bitmap-to-base64").booleanType().defaultValue(false).withDescription("");
 
+    public static final ConfigOption<Boolean> DORIS_READ_ARRAY_NATIVE_TYPE = 
ConfigOptions.name("doris.read.array.native-type").booleanType().defaultValue(false).withDescription("If
 true, Doris ARRAY columns are read as Spark ArrayType(StringType). If false 
(default), they are read as a JSON-encoded String for backward compatibility.");
+
     public static final ConfigOption<Integer> DORIS_SINK_NET_BUFFER_SIZE = 
ConfigOptions.name("doris.sink.net.buffer.size").intType().defaultValue(1024 * 
1024).withDescription("");
 
     public static final ConfigOption<Boolean> DORIS_SINK_HTTP_UTF8_CHARSET = 
ConfigOptions.name("doris.sink.http-utf8-charset").booleanType().defaultValue(false).withDescription("");
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowFlightSqlReader.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowFlightSqlReader.scala
index 03796c6..c4b1268 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowFlightSqlReader.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowFlightSqlReader.scala
@@ -36,6 +36,8 @@ class DorisRowFlightSqlReader(partition: 
DorisReaderPartition) extends DorisFlig
     rowBatch.next.asScala.zipWithIndex.foreach {
       case (s, index) if index < row.values.size && 
s.isInstanceOf[java.util.HashMap[String, String]] =>
         row.values.update(index, s.asInstanceOf[java.util.HashMap[String, 
String]].asScala)
+      case (s, index) if index < row.values.size && 
s.isInstanceOf[java.util.List[_]] =>
+        row.values.update(index, s.asInstanceOf[java.util.List[_]].asScala)
       case (s, index) if index < row.values.size => row.values.update(index, s)
       case _ => // nothing
     }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowThriftReader.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowThriftReader.scala
index 9c3cefa..6215c5b 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowThriftReader.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowThriftReader.scala
@@ -36,6 +36,8 @@ class DorisRowThriftReader(partition: DorisReaderPartition) 
extends DorisThriftR
     rowBatch.next.asScala.zipWithIndex.foreach {
       case (s, index) if index < row.values.size && 
s.isInstanceOf[java.util.HashMap[String, String]] =>
         row.values.update(index, s.asInstanceOf[java.util.HashMap[String, 
String]].asScala)
+      case (s, index) if index < row.values.size && 
s.isInstanceOf[java.util.List[_]] =>
+        row.values.update(index, s.asInstanceOf[java.util.List[_]].asScala)
       case (s, index) if index < row.values.size => row.values.update(index, s)
       case _ => // nothing
     }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/sources/DorisRelation.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/sources/DorisRelation.scala
index 55249e9..6fd7af5 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/sources/DorisRelation.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/sources/DorisRelation.scala
@@ -45,8 +45,9 @@ private[sql] class DorisRelation(
     val tableIdentifier = cfg.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER)
     val tableIdentifierArr = 
tableIdentifier.split("\\.").map(_.replaceAll("`", ""))
     val dorisSchema = frontend.getTableSchema(tableIdentifierArr(0), 
tableIdentifierArr(1))
+    val arrayNativeType = 
cfg.getValue(DorisOptions.DORIS_READ_ARRAY_NATIVE_TYPE)
     StructType(dorisSchema.getProperties.asScala.map(field => {
-      StructField(field.getName, 
SchemaConvertors.toCatalystType(field.getType, field.getPrecision, 
field.getScale))
+      StructField(field.getName, 
SchemaConvertors.toCatalystType(field.getType, field.getPrecision, 
field.getScale, arrayNativeType))
     }))
 
   }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
index 2d8b4d9..ba9d46f 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
@@ -22,7 +22,7 @@ import com.fasterxml.jackson.databind.json.JsonMapper
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, 
GenericArrayData}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -131,6 +131,16 @@ object RowConvertors {
         val keys = map.keys.toArray.map(UTF8String.fromString)
         val values = map.values.toArray.map(UTF8String.fromString)
         ArrayBasedMapData(keys, values)
+      case at: ArrayType =>
+        val list = v.asInstanceOf[java.util.List[_]]
+        val elements = new Array[Any](list.size())
+        var i = 0
+        while (i < list.size()) {
+          val elem = list.get(i)
+          elements(i) = if (elem == null) null else convertValue(elem, 
at.elementType, datetimeJava8ApiEnabled)
+          i += 1
+        }
+        new GenericArrayData(elements)
       case NullType | BooleanType | ByteType | ShortType | IntegerType | 
LongType | FloatType | DoubleType | BinaryType | _: DecimalType => v
       case _ => throw new Exception(s"Unsupported spark type: 
${dataType.typeName}")
     }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/SchemaConvertors.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/SchemaConvertors.scala
index 91b8317..8285064 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/SchemaConvertors.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/SchemaConvertors.scala
@@ -20,12 +20,12 @@ package org.apache.doris.spark.util
 
 import org.apache.doris.sdk.thrift.TScanColumnDesc
 import org.apache.doris.spark.rest.models.{Field, Schema}
-import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, MapType}
+import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, 
DecimalType, MapType}
 
 object SchemaConvertors {
 
   @throws[IllegalArgumentException]
-  def toCatalystType(dorisType: String, precision: Int, scale: Int): DataType 
= {
+  def toCatalystType(dorisType: String, precision: Int, scale: Int, 
arrayNativeType: Boolean = false): DataType = {
     dorisType match {
       case "NULL_TYPE" => DataTypes.NullType
       case "BOOLEAN" => DataTypes.BooleanType
@@ -52,7 +52,7 @@ object SchemaConvertors {
       case "DECIMAL128" => DecimalType(precision, scale)
       case "TIME" => DataTypes.DoubleType
       case "STRING" => DataTypes.StringType
-      case "ARRAY" => DataTypes.StringType
+      case "ARRAY" => if (arrayNativeType) ArrayType(DataTypes.StringType, 
containsNull = true) else DataTypes.StringType
       case "MAP" => MapType(DataTypes.StringType, DataTypes.StringType)
       case "STRUCT" => DataTypes.StringType
       case "VARIANT" => DataTypes.StringType
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala
index 5a08a32..6ec4712 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala
@@ -119,6 +119,18 @@ class RowConvertorsTest {
     Assert.assertTrue(RowConvertors.convertValue(map, 
MapType(DataTypes.StringType, DataTypes.StringType), 
false).isInstanceOf[MapData])
     Assert.assertTrue(RowConvertors.convertValue("test", DataTypes.StringType, 
false).isInstanceOf[UTF8String])
 
+    val list = new util.ArrayList[String]()
+    list.add("a")
+    list.add(null)
+    list.add("b")
+    val arrayValue = RowConvertors.convertValue(list, 
ArrayType(DataTypes.StringType), false)
+    Assert.assertTrue(arrayValue.isInstanceOf[ArrayData])
+    val arrayData = arrayValue.asInstanceOf[ArrayData]
+    Assert.assertEquals(3, arrayData.numElements())
+    Assert.assertEquals(UTF8String.fromString("a"), arrayData.getUTF8String(0))
+    Assert.assertTrue(arrayData.isNullAt(1))
+    Assert.assertEquals(UTF8String.fromString("b"), arrayData.getUTF8String(2))
+
   }
 
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/SchemaConvertorsTest.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/SchemaConvertorsTest.scala
index b259bc1..f014b5f 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/SchemaConvertorsTest.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/SchemaConvertorsTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.doris.spark.util
 
-import org.apache.spark.sql.types.{DataTypes, DecimalType, MapType}
+import org.apache.spark.sql.types.{ArrayType, DataTypes, DecimalType, MapType}
 import org.junit.Assert
 import org.junit.jupiter.api.Test
 
@@ -63,4 +63,10 @@ class SchemaConvertorsTest {
 
   }
 
+  @Test
+  def toCatalystTypeArrayNativeTypeTest(): Unit = {
+    Assert.assertEquals(SchemaConvertors.toCatalystType("ARRAY", -1, -1, 
true), ArrayType(DataTypes.StringType, true))
+    Assert.assertEquals(SchemaConvertors.toCatalystType("ARRAY", -1, -1, 
false), DataTypes.StringType)
+  }
+
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
index 2ed6188..8815714 100644
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
@@ -562,6 +562,110 @@ class DorisReaderITCase(readMode: String, flightSqlPort: 
Int) extends AbstractCo
     }
   }
 
+  @Test
+  def testReadArrayNativeType(): Unit = {
+    val sourceInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql")
+    ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
sourceInitSql: _*)
+
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    try {
+      session.sql(
+        s"""
+           |CREATE TEMPORARY VIEW test_source
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}",
+           | "doris.read.mode"="${readMode}",
+           | "doris.read.arrow-flight-sql.port"="${flightSqlPort}",
+           | "doris.read.array.native-type"="true"
+           |)
+           |""".stripMargin)
+
+      val schemaType = session.sql("select c15 from 
test_source").schema.fields(0).dataType.typeName
+      assert("array".equals(schemaType))
+
+      val sizes = session.sql(
+        """
+          |select id, size(c15) as sz from test_source where c15 is not null 
order by id
+          |""".stripMargin).collect().toList.toString()
+      assert("List([1,2], [2,2], [3,2])".equals(sizes))
+
+      val arrayRows = session.sql(
+        """
+          |select id, c15 from test_source order by id
+          |""".stripMargin).collect()
+
+      assert(arrayRows(0).getList[String](1).toString == "[Alice, Bob]")
+      assert(arrayRows(1).getList[String](1).toString == "[Charlie, David]")
+      assert(arrayRows(2).getList[String](1).toString == "[Eve, Frank]")
+      assert(arrayRows(3).isNullAt(1))
+    } finally {
+      session.stop()
+    }
+  }
+
+  @Test
+  def testDataFrameSourceArrayNativeType(): Unit = {
+    val sourceInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql")
+    ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
sourceInitSql: _*)
+
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    try {
+      val dorisSparkDF = session.read
+        .format("doris")
+        .option("doris.fenodes", getFenodes)
+        .option("doris.table.identifier", DATABASE + "." + 
TABLE_READ_TBL_ALL_TYPES)
+        .option("doris.user", getDorisUsername)
+        .option("doris.password", getDorisPassword)
+        .option("doris.read.mode", readMode)
+        .option("doris.read.arrow-flight-sql.port", flightSqlPort.toString)
+        .option("doris.read.array.native-type", "true")
+        .load()
+
+      assert("array".equals(dorisSparkDF.schema("c15").dataType.typeName))
+
+      val sizes = dorisSparkDF
+        .where("c15 is not null")
+        .selectExpr("id", "size(c15) as sz")
+        .orderBy("id")
+        .collect().toList.toString()
+      assert("List([1,2], [2,2], [3,2])".equals(sizes))
+    } finally {
+      session.stop()
+    }
+  }
+
+  @Test
+  def testReadArrayDefaultsToString(): Unit = {
+    val sourceInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql")
+    ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
sourceInitSql: _*)
+
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    try {
+      session.sql(
+        s"""
+           |CREATE TEMPORARY VIEW test_source
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}",
+           | "doris.read.mode"="${readMode}",
+           | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+           |)
+           |""".stripMargin)
+
+      val schemaType = session.sql("select c15 from 
test_source").schema.fields(0).dataType.typeName
+      assert("string".equals(schemaType))
+    } finally {
+      session.stop()
+    }
+  }
+
   @Test
   def testExpressionNotPushDown(): Unit = {
     val sourceInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/read_filter_pushdown.sql")
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
index 2df410b..214732f 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import java.util
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.language.implicitConversions
 
 
 abstract class DorisTableBase(identifier: Identifier, config: DorisConfig, 
schema: Option[StructType]) extends Table with SupportsRead with SupportsWrite {
@@ -42,7 +41,7 @@ abstract class DorisTableBase(identifier: Identifier, config: 
DorisConfig, schem
 
   override def schema(): StructType = schema.getOrElse({
     val dorisSchema = frontend.getTableSchema(identifier.namespace()(0), 
identifier.name())
-    dorisSchema
+    dorisSchemaToStructType(dorisSchema)
   })
 
   override def capabilities(): util.Set[TableCapability] = {
@@ -69,9 +68,10 @@ abstract class DorisTableBase(identifier: Identifier, 
config: DorisConfig, schem
     createWriteBuilder(config, logicalWriteInfo.schema())
   }
 
-  private implicit def dorisSchemaToStructType(dorisSchema: Schema): 
StructType = {
+  private def dorisSchemaToStructType(dorisSchema: Schema): StructType = {
+    val arrayNativeType = 
config.getValue(DorisOptions.DORIS_READ_ARRAY_NATIVE_TYPE)
     StructType(dorisSchema.getProperties.asScala.map(field => {
-      StructField(field.getName, 
SchemaConvertors.toCatalystType(field.getType, field.getPrecision, 
field.getScale))
+      StructField(field.getName, 
SchemaConvertors.toCatalystType(field.getType, field.getPrecision, 
field.getScale, arrayNativeType))
     }))
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to