This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c723a5  [fix] keep field order when writing struct type data (#169)
3c723a5 is described below

commit 3c723a5a85377e60435db7f18ef1483afd8a5f82
Author: gnehil <[email protected]>
AuthorDate: Fri Dec 8 22:01:33 2023 +0800

    [fix] keep field order when writing struct type data (#169)
    
    Co-authored-by: gnehil <gnehil489@github>
---
 .../org/apache/doris/spark/sql/SchemaUtils.scala   |  8 +---
 .../apache/doris/spark/sql/SchemaUtilsTest.scala   | 45 ++++++++++------------
 2 files changed, 23 insertions(+), 30 deletions(-)

diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
index d56a4a3..032f16b 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
@@ -31,10 +31,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
 import org.slf4j.LoggerFactory
 
-import java.sql.Timestamp
-import java.time.format.DateTimeFormatter
-import java.time.{LocalDateTime, ZoneOffset}
-import java.util.Locale
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 
@@ -194,11 +190,11 @@ private[spark] object SchemaUtils {
           }
         case st: StructType =>
           val structData = row.getStruct(ordinal, st.length)
-          val map = mutable.HashMap[String, Any]()
+          val map = new java.util.TreeMap[String, Any]()
           var i = 0
           while (i < structData.numFields) {
             val field = st.get(i)
-            map += field.name -> rowColumnValue(structData, i, field.dataType)
+            map.put(field.name, rowColumnValue(structData, i, field.dataType))
             i += 1
           }
           MAPPER.writeValueAsString(map)
diff --git 
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala
 
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala
index 7e6e5f5..4465989 100644
--- 
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala
+++ 
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala
@@ -17,44 +17,41 @@
 
 package org.apache.doris.spark.sql
 
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{Row, SparkSession}
-import org.junit.{Assert, Ignore, Test}
+import org.apache.spark.unsafe.types.UTF8String
+import org.junit.{Assert, Test}
 
 import java.sql.{Date, Timestamp}
 
-@Ignore
 class SchemaUtilsTest {
 
   @Test
   def rowColumnValueTest(): Unit = {
 
-    val spark = SparkSession.builder().master("local").getOrCreate()
+    val row = InternalRow(
+      1,
+      CatalystTypeConverters.convertToCatalyst(Date.valueOf("2023-09-08")),
+      CatalystTypeConverters.convertToCatalyst(Timestamp.valueOf("2023-09-08 
17:00:00")),
+      ArrayData.toArrayData(Array(1, 2, 3)),
+      CatalystTypeConverters.convertToCatalyst(Map[String, String]("a" -> 
"1")),
+      InternalRow(UTF8String.fromString("a"), 1)
+    )
 
-    val rdd = spark.sparkContext.parallelize(Seq(
-      Row(1, Date.valueOf("2023-09-08"), Timestamp.valueOf("2023-09-08 
17:00:00"), Array(1, 2, 3),
-        Map[String, String]("a" -> "1"), Row("a", 1))
-    ))
-    val df = spark.createDataFrame(rdd, new StructType().add("c1", IntegerType)
+    val schema = new StructType().add("c1", IntegerType)
       .add("c2", DateType)
       .add("c3", TimestampType)
       .add("c4", ArrayType.apply(IntegerType))
       .add("c5", MapType.apply(StringType, StringType))
-      .add("c6", StructType.apply(Seq(StructField("a", StringType), 
StructField("b", IntegerType)))))
-
-    val schema = df.schema
-
-    df.queryExecution.toRdd.foreach(row => {
-
-      val fields = schema.fields
-      Assert.assertEquals(1, SchemaUtils.rowColumnValue(row, 0, 
fields(0).dataType))
-      Assert.assertEquals("2023-09-08", SchemaUtils.rowColumnValue(row, 1, 
fields(1).dataType))
-      Assert.assertEquals("2023-09-08 17:00:00.0", 
SchemaUtils.rowColumnValue(row, 2, fields(2).dataType))
-      Assert.assertEquals("[1,2,3]", SchemaUtils.rowColumnValue(row, 3, 
fields(3).dataType))
-      Assert.assertEquals("{\"a\":\"1\"}", SchemaUtils.rowColumnValue(row, 4, 
fields(4).dataType))
-      Assert.assertEquals("{\"a\":\"a\",\"b\":1}", 
SchemaUtils.rowColumnValue(row, 5, fields(5).dataType))
-
-    })
+      .add("c6", StructType.apply(Seq(StructField("a", StringType), 
StructField("b", IntegerType))))
+
+    Assert.assertEquals(1, SchemaUtils.rowColumnValue(row, 0, 
schema.fields(0).dataType))
+    Assert.assertEquals("2023-09-08", SchemaUtils.rowColumnValue(row, 1, 
schema.fields(1).dataType))
+    Assert.assertEquals("2023-09-08 17:00:00.0", 
SchemaUtils.rowColumnValue(row, 2, schema.fields(2).dataType))
+    Assert.assertEquals("[1,2,3]", SchemaUtils.rowColumnValue(row, 3, 
schema.fields(3).dataType))
+    Assert.assertEquals("{\"a\":\"1\"}", SchemaUtils.rowColumnValue(row, 4, 
schema.fields(4).dataType))
+    Assert.assertEquals("{\"a\":\"a\",\"b\":1}", 
SchemaUtils.rowColumnValue(row, 5, schema.fields(5).dataType))
 
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to