This is an automated email from the ASF dual-hosted git repository. ajantha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 32c2306 [CARBONDATA-4057] Support Complex DataType when Save DataFrame with MODE.OVERWRITE 32c2306 is described below commit 32c2306fd954584991d7e1b883581a1f475e6957 Author: haomarch <marchp...@126.com> AuthorDate: Tue Nov 24 17:59:56 2020 +0800 [CARBONDATA-4057] Support Complex DataType when Save DataFrame with MODE.OVERWRITE Why is this PR needed? Currently, when save dataframe with MODE.OVERWRITE, createtable will be triggered. But complex type isn't supported. Which weaks the functionality of dataframe save in carbondata format. What changes were proposed in this PR? Add the converter of ARRAY/MAP/STRUCT in CarbonDataFrameWriter.convertToCarbonType Does this PR introduce any user interface change? No Is any new testcase added? Yes This closes #4021 --- .../apache/spark/sql/CarbonDataFrameWriter.scala | 3 ++ .../complexType/TestAllComplexDataType.scala | 58 +++++++++++++++++++++- 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala index d07dbb4..a0a871a 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala @@ -74,6 +74,9 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { case decimal: DecimalType => s"decimal(${decimal.precision}, ${decimal.scale})" case BooleanType => CarbonType.BOOLEAN.getName case BinaryType => CarbonType.BINARY.getName + case ArrayType(elementType, _) => sparkType.simpleString + case StructType(fields) => sparkType.simpleString + case MapType(keyType, valueType, _) => sparkType.simpleString case other => CarbonException.analysisException(s"unsupported type: $other") } } diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAllComplexDataType.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAllComplexDataType.scala index 041e7ab..d2f4ea1 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAllComplexDataType.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAllComplexDataType.scala @@ -16,8 +16,13 @@ */ package org.apache.carbondata.integration.spark.testsuite.complexType -import org.apache.spark.sql.DataFrame +import java.sql.{Date, Timestamp} + +import scala.collection.mutable.WrappedArray + +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DateType, Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, StructField, StructType, TimestampType, VarcharType} import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -716,5 +721,56 @@ class TestAllComplexDataType extends QueryTest with BeforeAndAfterAll { insertData("fileformatTable") checkResults() } + + test("test when dataframe save with complex datatype") { + dropTable("dataframe_complex_carbondata") + dropTable("dataframe_complex_parquet") + + val structureData = Seq( + Row( + "id1", 1, Short.MinValue, 1L, 1.0f, 1.0d, BigDecimal(832.23), + "binary".getBytes, + true, + Timestamp.valueOf("2017-01-01 00:00:00.0"), + Date.valueOf("1990-01-01"), + WrappedArray.make(Array("1")), Map("1"-> "1"), + WrappedArray.make(Array(Map("1"-> "1"))), + Row("x") + ) + ) + + import scala.collection.JavaConverters._ + val schemas = Seq( + StructField("c1", StringType, nullable = false), + StructField("c2", IntegerType, nullable = false), + StructField("c3", ShortType, nullable = false), + StructField("c4", LongType, nullable = false), + StructField("c5", FloatType, nullable = false), + StructField("c6", DoubleType, nullable = false), + StructField("c7", DecimalType(18, 2), nullable = false), + StructField("c8", BinaryType, nullable = false), + StructField("c9", BooleanType, nullable = false), + StructField("c10", TimestampType, nullable = false), + StructField("c11", DateType, nullable = false), + StructField("c12", ArrayType(StringType), false), + StructField("c13", MapType(StringType, StringType), false), + StructField("c14", ArrayType(MapType(StringType, StringType)), false), + StructField("c15", StructType(List(StructField("c15_1", StringType, nullable = false)).asJava), false) + ) + + val df = sqlContext.sparkSession.createDataFrame(sqlContext.sparkContext.parallelize(structureData), + StructType(schemas)) + df.write.option("dbName", "default") + .option("tableName", "dataframe_complex_carbondata") + .format("carbondata") + .mode("overwrite") + .save() + + checkAnswer( + sql("select * from dataframe_complex_carbondata"), + structureData + ) + dropTable("dataframe_complex_carbondata") + } } // scalastyle:on lineLength