This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 155a6a3 map and struct type write format (#146)
155a6a3 is described below
commit 155a6a3f94f8b843c85431c3462a69a9452e3f46
Author: gnehil <[email protected]>
AuthorDate: Sun Oct 8 18:30:37 2023 +0800
map and struct type write format (#146)
---
.../org/apache/doris/spark/sql/SchemaUtils.scala | 33 +++++++++++++++++++---
.../apache/doris/spark/sql/SchemaUtilsTest.scala | 21 +++++++++-----
2 files changed, 43 insertions(+), 11 deletions(-)
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
index 86a403f..677cc2e 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
@@ -176,17 +176,42 @@ private[spark] object SchemaUtils {
val mapData = row.getMap(ordinal)
val keys = mapData.keyArray()
val values = mapData.valueArray()
+ val sb = StringBuilder.newBuilder
+ sb.append("{")
var i = 0
- val map = mutable.Map[Any, Any]()
while (i < keys.numElements()) {
- map += rowColumnValue(keys, i, mt.keyType) -> rowColumnValue(values,
i, mt.valueType)
+ rowColumnValue(keys, i, mt.keyType) -> rowColumnValue(values, i,
mt.valueType)
+ sb.append(quoteData(rowColumnValue(keys, i, mt.keyType), mt.keyType))
+ .append(":").append(quoteData(rowColumnValue(values, i,
mt.valueType), mt.valueType))
+ .append(",")
i += 1
}
- map.toMap.asJava
- case st: StructType => row.getStruct(ordinal, st.length)
+ if (i > 0) sb.dropRight(1)
+ sb.append("}").toString
+ case st: StructType =>
+ val structData = row.getStruct(ordinal, st.length)
+ val sb = StringBuilder.newBuilder
+ sb.append("{")
+ var i = 0
+ while (i < structData.numFields) {
+ val field = st.get(i)
+ sb.append(s""""${field.name}":""")
+ .append(quoteData(rowColumnValue(structData, i, field.dataType),
field.dataType))
+ .append(",")
+ i += 1
+ }
+ if (i > 0) sb.dropRight(1)
+ sb.append("}").toString
case _ => throw new DorisException(s"Unsupported spark type:
${dataType.typeName}")
}
}
+ private def quoteData(value: Any, dataType: DataType): Any = {
+ dataType match {
+ case StringType | TimestampType | DateType => s""""$value""""
+ case _ => value
+ }
+ }
+
}
diff --git
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala
index e3868cb..7e6e5f5 100644
---
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala
+++
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala
@@ -17,11 +17,11 @@
package org.apache.doris.spark.sql
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{Row, SparkSession}
import org.junit.{Assert, Ignore, Test}
import java.sql.{Date, Timestamp}
-import scala.collection.JavaConverters._
@Ignore
class SchemaUtilsTest {
@@ -31,9 +31,16 @@ class SchemaUtilsTest {
val spark = SparkSession.builder().master("local").getOrCreate()
- val df = spark.createDataFrame(Seq(
- (1, Date.valueOf("2023-09-08"), Timestamp.valueOf("2023-09-08
17:00:00"), Array(1, 2, 3), Map[String, String]("a" -> "1"))
- )).toDF("c1", "c2", "c3", "c4", "c5")
+ val rdd = spark.sparkContext.parallelize(Seq(
+ Row(1, Date.valueOf("2023-09-08"), Timestamp.valueOf("2023-09-08
17:00:00"), Array(1, 2, 3),
+ Map[String, String]("a" -> "1"), Row("a", 1))
+ ))
+ val df = spark.createDataFrame(rdd, new StructType().add("c1", IntegerType)
+ .add("c2", DateType)
+ .add("c3", TimestampType)
+ .add("c4", ArrayType.apply(IntegerType))
+ .add("c5", MapType.apply(StringType, StringType))
+ .add("c6", StructType.apply(Seq(StructField("a", StringType),
StructField("b", IntegerType)))))
val schema = df.schema
@@ -44,8 +51,8 @@ class SchemaUtilsTest {
Assert.assertEquals("2023-09-08", SchemaUtils.rowColumnValue(row, 1,
fields(1).dataType))
Assert.assertEquals("2023-09-08 17:00:00.0",
SchemaUtils.rowColumnValue(row, 2, fields(2).dataType))
Assert.assertEquals("[1,2,3]", SchemaUtils.rowColumnValue(row, 3,
fields(3).dataType))
- println(SchemaUtils.rowColumnValue(row, 4, fields(4).dataType))
- Assert.assertEquals(Map("a" -> "1").asJava,
SchemaUtils.rowColumnValue(row, 4, fields(4).dataType))
+ Assert.assertEquals("{\"a\":\"1\"}", SchemaUtils.rowColumnValue(row, 4,
fields(4).dataType))
+ Assert.assertEquals("{\"a\":\"a\",\"b\":1}",
SchemaUtils.rowColumnValue(row, 5, fields(5).dataType))
})
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]