This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 32c2306  [CARBONDATA-4057] Support Complex DataType when Save 
DataFrame with MODE.OVERWRITE
32c2306 is described below

commit 32c2306fd954584991d7e1b883581a1f475e6957
Author: haomarch <marchp...@126.com>
AuthorDate: Tue Nov 24 17:59:56 2020 +0800

    [CARBONDATA-4057] Support Complex DataType when Save DataFrame with 
MODE.OVERWRITE
    
    Why is this PR needed?
    Currently, when save dataframe with MODE.OVERWRITE, createtable will be 
triggered. But complex type isn't supported.
    Which weaks the functionality of dataframe save in carbondata format.
    
    What changes were proposed in this PR?
    Add the converter of ARRAY/MAP/STRUCT in 
CarbonDataFrameWriter.convertToCarbonType
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4021
---
 .../apache/spark/sql/CarbonDataFrameWriter.scala   |  3 ++
 .../complexType/TestAllComplexDataType.scala       | 58 +++++++++++++++++++++-
 2 files changed, 60 insertions(+), 1 deletion(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index d07dbb4..a0a871a 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -74,6 +74,9 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val 
dataFrame: DataFrame) {
       case decimal: DecimalType => s"decimal(${decimal.precision}, 
${decimal.scale})"
       case BooleanType => CarbonType.BOOLEAN.getName
       case BinaryType => CarbonType.BINARY.getName
+      case ArrayType(elementType, _) => sparkType.simpleString
+      case StructType(fields) => sparkType.simpleString
+      case MapType(keyType, valueType, _) => sparkType.simpleString
       case other => CarbonException.analysisException(s"unsupported type: 
$other")
     }
   }
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAllComplexDataType.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAllComplexDataType.scala
index 041e7ab..d2f4ea1 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAllComplexDataType.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAllComplexDataType.scala
@@ -16,8 +16,13 @@
  */
 package org.apache.carbondata.integration.spark.testsuite.complexType
 
-import org.apache.spark.sql.DataFrame
+import java.sql.{Date, Timestamp}
+
+import scala.collection.mutable.WrappedArray
+
+import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, 
DateType, Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, 
MapType, ShortType, StringType, StructField, StructType, TimestampType, 
VarcharType}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -716,5 +721,56 @@ class TestAllComplexDataType extends QueryTest with 
BeforeAndAfterAll {
     insertData("fileformatTable")
     checkResults()
   }
+
+  test("test when dataframe save with complex datatype") {
+    dropTable("dataframe_complex_carbondata")
+    dropTable("dataframe_complex_parquet")
+
+    val structureData = Seq(
+      Row(
+        "id1", 1, Short.MinValue, 1L, 1.0f, 1.0d, BigDecimal(832.23),
+        "binary".getBytes,
+        true,
+        Timestamp.valueOf("2017-01-01 00:00:00.0"),
+        Date.valueOf("1990-01-01"),
+        WrappedArray.make(Array("1")), Map("1"-> "1"),
+        WrappedArray.make(Array(Map("1"-> "1"))),
+        Row("x")
+      )
+    )
+
+    import scala.collection.JavaConverters._
+    val schemas = Seq(
+        StructField("c1", StringType, nullable = false),
+        StructField("c2", IntegerType, nullable = false),
+        StructField("c3", ShortType, nullable = false),
+        StructField("c4", LongType, nullable = false),
+        StructField("c5", FloatType, nullable = false),
+        StructField("c6", DoubleType, nullable = false),
+        StructField("c7", DecimalType(18, 2), nullable = false),
+        StructField("c8", BinaryType, nullable = false),
+        StructField("c9", BooleanType, nullable = false),
+        StructField("c10", TimestampType, nullable = false),
+        StructField("c11", DateType, nullable = false),
+        StructField("c12", ArrayType(StringType), false),
+        StructField("c13", MapType(StringType, StringType), false),
+        StructField("c14", ArrayType(MapType(StringType, StringType)), false),
+        StructField("c15", StructType(List(StructField("c15_1", StringType, 
nullable = false)).asJava), false)
+      )
+
+    val df = 
sqlContext.sparkSession.createDataFrame(sqlContext.sparkContext.parallelize(structureData),
+      StructType(schemas))
+    df.write.option("dbName", "default")
+      .option("tableName", "dataframe_complex_carbondata")
+      .format("carbondata")
+      .mode("overwrite")
+      .save()
+
+    checkAnswer(
+      sql("select * from dataframe_complex_carbondata"),
+      structureData
+    )
+    dropTable("dataframe_complex_carbondata")
+  }
 }
 // scalastyle:on lineLength

Reply via email to