This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 32c2306 [CARBONDATA-4057] Support Complex DataType when Save
DataFrame with MODE.OVERWRITE
32c2306 is described below
commit 32c2306fd954584991d7e1b883581a1f475e6957
Author: haomarch <[email protected]>
AuthorDate: Tue Nov 24 17:59:56 2020 +0800
[CARBONDATA-4057] Support Complex DataType when Save DataFrame with
MODE.OVERWRITE
Why is this PR needed?
Currently, when save dataframe with MODE.OVERWRITE, createtable will be
triggered. But complex type isn't supported.
Which weaks the functionality of dataframe save in carbondata format.
What changes were proposed in this PR?
Add the converter of ARRAY/MAP/STRUCT in
CarbonDataFrameWriter.convertToCarbonType
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4021
---
.../apache/spark/sql/CarbonDataFrameWriter.scala | 3 ++
.../complexType/TestAllComplexDataType.scala | 58 +++++++++++++++++++++-
2 files changed, 60 insertions(+), 1 deletion(-)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index d07dbb4..a0a871a 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -74,6 +74,9 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val
dataFrame: DataFrame) {
case decimal: DecimalType => s"decimal(${decimal.precision},
${decimal.scale})"
case BooleanType => CarbonType.BOOLEAN.getName
case BinaryType => CarbonType.BINARY.getName
+ case ArrayType(elementType, _) => sparkType.simpleString
+ case StructType(fields) => sparkType.simpleString
+ case MapType(keyType, valueType, _) => sparkType.simpleString
case other => CarbonException.analysisException(s"unsupported type:
$other")
}
}
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAllComplexDataType.scala
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAllComplexDataType.scala
index 041e7ab..d2f4ea1 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAllComplexDataType.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAllComplexDataType.scala
@@ -16,8 +16,13 @@
*/
package org.apache.carbondata.integration.spark.testsuite.complexType
-import org.apache.spark.sql.DataFrame
+import java.sql.{Date, Timestamp}
+
+import scala.collection.mutable.WrappedArray
+
+import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType,
DateType, Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType,
MapType, ShortType, StringType, StructField, StructType, TimestampType,
VarcharType}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -716,5 +721,56 @@ class TestAllComplexDataType extends QueryTest with
BeforeAndAfterAll {
insertData("fileformatTable")
checkResults()
}
+
+ test("test when dataframe save with complex datatype") {
+ dropTable("dataframe_complex_carbondata")
+ dropTable("dataframe_complex_parquet")
+
+ val structureData = Seq(
+ Row(
+ "id1", 1, Short.MinValue, 1L, 1.0f, 1.0d, BigDecimal(832.23),
+ "binary".getBytes,
+ true,
+ Timestamp.valueOf("2017-01-01 00:00:00.0"),
+ Date.valueOf("1990-01-01"),
+ WrappedArray.make(Array("1")), Map("1"-> "1"),
+ WrappedArray.make(Array(Map("1"-> "1"))),
+ Row("x")
+ )
+ )
+
+ import scala.collection.JavaConverters._
+ val schemas = Seq(
+ StructField("c1", StringType, nullable = false),
+ StructField("c2", IntegerType, nullable = false),
+ StructField("c3", ShortType, nullable = false),
+ StructField("c4", LongType, nullable = false),
+ StructField("c5", FloatType, nullable = false),
+ StructField("c6", DoubleType, nullable = false),
+ StructField("c7", DecimalType(18, 2), nullable = false),
+ StructField("c8", BinaryType, nullable = false),
+ StructField("c9", BooleanType, nullable = false),
+ StructField("c10", TimestampType, nullable = false),
+ StructField("c11", DateType, nullable = false),
+ StructField("c12", ArrayType(StringType), false),
+ StructField("c13", MapType(StringType, StringType), false),
+ StructField("c14", ArrayType(MapType(StringType, StringType)), false),
+ StructField("c15", StructType(List(StructField("c15_1", StringType,
nullable = false)).asJava), false)
+ )
+
+ val df =
sqlContext.sparkSession.createDataFrame(sqlContext.sparkContext.parallelize(structureData),
+ StructType(schemas))
+ df.write.option("dbName", "default")
+ .option("tableName", "dataframe_complex_carbondata")
+ .format("carbondata")
+ .mode("overwrite")
+ .save()
+
+ checkAnswer(
+ sql("select * from dataframe_complex_carbondata"),
+ structureData
+ )
+ dropTable("dataframe_complex_carbondata")
+ }
}
// scalastyle:on lineLength