This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 3c723a5 [fix] keep field order when writing struct type data (#169)
3c723a5 is described below
commit 3c723a5a85377e60435db7f18ef1483afd8a5f82
Author: gnehil <[email protected]>
AuthorDate: Fri Dec 8 22:01:33 2023 +0800
[fix] keep field order when writing struct type data (#169)
Co-authored-by: gnehil <gnehil489@github>
---
.../org/apache/doris/spark/sql/SchemaUtils.scala | 8 +---
.../apache/doris/spark/sql/SchemaUtilsTest.scala | 45 ++++++++++------------
2 files changed, 23 insertions(+), 30 deletions(-)
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
index d56a4a3..032f16b 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
@@ -31,10 +31,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.slf4j.LoggerFactory
-import java.sql.Timestamp
-import java.time.format.DateTimeFormatter
-import java.time.{LocalDateTime, ZoneOffset}
-import java.util.Locale
import scala.collection.JavaConversions._
import scala.collection.mutable
@@ -194,11 +190,11 @@ private[spark] object SchemaUtils {
}
case st: StructType =>
val structData = row.getStruct(ordinal, st.length)
- val map = mutable.HashMap[String, Any]()
+ val map = new java.util.TreeMap[String, Any]()
var i = 0
while (i < structData.numFields) {
val field = st.get(i)
- map += field.name -> rowColumnValue(structData, i, field.dataType)
+ map.put(field.name, rowColumnValue(structData, i, field.dataType))
i += 1
}
MAPPER.writeValueAsString(map)
diff --git
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala
index 7e6e5f5..4465989 100644
---
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala
+++
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala
@@ -17,44 +17,41 @@
package org.apache.doris.spark.sql
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{Row, SparkSession}
-import org.junit.{Assert, Ignore, Test}
+import org.apache.spark.unsafe.types.UTF8String
+import org.junit.{Assert, Test}
import java.sql.{Date, Timestamp}
-@Ignore
class SchemaUtilsTest {
@Test
def rowColumnValueTest(): Unit = {
- val spark = SparkSession.builder().master("local").getOrCreate()
+ val row = InternalRow(
+ 1,
+ CatalystTypeConverters.convertToCatalyst(Date.valueOf("2023-09-08")),
+ CatalystTypeConverters.convertToCatalyst(Timestamp.valueOf("2023-09-08
17:00:00")),
+ ArrayData.toArrayData(Array(1, 2, 3)),
+ CatalystTypeConverters.convertToCatalyst(Map[String, String]("a" ->
"1")),
+ InternalRow(UTF8String.fromString("a"), 1)
+ )
- val rdd = spark.sparkContext.parallelize(Seq(
- Row(1, Date.valueOf("2023-09-08"), Timestamp.valueOf("2023-09-08
17:00:00"), Array(1, 2, 3),
- Map[String, String]("a" -> "1"), Row("a", 1))
- ))
- val df = spark.createDataFrame(rdd, new StructType().add("c1", IntegerType)
+ val schema = new StructType().add("c1", IntegerType)
.add("c2", DateType)
.add("c3", TimestampType)
.add("c4", ArrayType.apply(IntegerType))
.add("c5", MapType.apply(StringType, StringType))
- .add("c6", StructType.apply(Seq(StructField("a", StringType),
StructField("b", IntegerType)))))
-
- val schema = df.schema
-
- df.queryExecution.toRdd.foreach(row => {
-
- val fields = schema.fields
- Assert.assertEquals(1, SchemaUtils.rowColumnValue(row, 0,
fields(0).dataType))
- Assert.assertEquals("2023-09-08", SchemaUtils.rowColumnValue(row, 1,
fields(1).dataType))
- Assert.assertEquals("2023-09-08 17:00:00.0",
SchemaUtils.rowColumnValue(row, 2, fields(2).dataType))
- Assert.assertEquals("[1,2,3]", SchemaUtils.rowColumnValue(row, 3,
fields(3).dataType))
- Assert.assertEquals("{\"a\":\"1\"}", SchemaUtils.rowColumnValue(row, 4,
fields(4).dataType))
- Assert.assertEquals("{\"a\":\"a\",\"b\":1}",
SchemaUtils.rowColumnValue(row, 5, fields(5).dataType))
-
- })
+ .add("c6", StructType.apply(Seq(StructField("a", StringType),
StructField("b", IntegerType))))
+
+ Assert.assertEquals(1, SchemaUtils.rowColumnValue(row, 0,
schema.fields(0).dataType))
+ Assert.assertEquals("2023-09-08", SchemaUtils.rowColumnValue(row, 1,
schema.fields(1).dataType))
+ Assert.assertEquals("2023-09-08 17:00:00.0",
SchemaUtils.rowColumnValue(row, 2, schema.fields(2).dataType))
+ Assert.assertEquals("[1,2,3]", SchemaUtils.rowColumnValue(row, 3,
schema.fields(3).dataType))
+ Assert.assertEquals("{\"a\":\"1\"}", SchemaUtils.rowColumnValue(row, 4,
schema.fields(4).dataType))
+ Assert.assertEquals("{\"a\":\"a\",\"b\":1}",
SchemaUtils.rowColumnValue(row, 5, schema.fields(5).dataType))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]