This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b8bdcd  [CARBONDATA-3820] Fix CDC failure when sort columns present 
in source dataframe
5b8bdcd is described below

commit 5b8bdcd44318f938a0588e34417d2afcb76f676e
Author: haomarch <marchp...@126.com>
AuthorDate: Tue May 12 17:39:20 2020 +0800

    [CARBONDATA-3820] Fix CDC failure when sort columns present in source 
dataframe
    
    Why is this PR needed?
    While merging into table with sortcolumns in the CDC Flow. The following 
exception will be throwed:
    "column: id specified in sort columns does not exist in schema".
    Root cause is that we use TBLProperteis with sortcolumns to create the 
TUPLEID_statusOnMerge carbonwriter, in which the sortcolumn、sortscope are all 
useless.
    
    What changes were proposed in this PR?
    remove the sortcolumn property when creating TUPLEID_statusOnMerge 
carbonwriter.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3764
---
 .../apache/carbondata/examples/CDCExample.scala    |   2 +
 .../mutation/merge/CarbonMergeDataSetCommand.scala |   4 +-
 .../spark/testsuite/merge/MergeTestCase.scala      | 111 +++++++++++++++++++--
 3 files changed, 109 insertions(+), 8 deletions(-)

diff --git 
a/examples/spark/src/main/scala/org/apache/carbondata/examples/CDCExample.scala 
b/examples/spark/src/main/scala/org/apache/carbondata/examples/CDCExample.scala
index cea17c4..304bf93 100644
--- 
a/examples/spark/src/main/scala/org/apache/carbondata/examples/CDCExample.scala
+++ 
b/examples/spark/src/main/scala/org/apache/carbondata/examples/CDCExample.scala
@@ -141,6 +141,8 @@ object CDCExample {
         .write
         .format("carbondata")
         .option("tableName", "target")
+        .option("sort_scope", "global_sort")
+        .option("sort_column", "id")
         .mode(SaveMode.Overwrite)
         .save()
     }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
index 4e6956b..4cee705 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
@@ -186,7 +186,7 @@ case class CarbonMergeDataSetCommand(
     CarbonInsertIntoWithDf(
       databaseNameOp = Some(carbonTable.getDatabaseName),
       tableName = carbonTable.getTableName,
-      options = Map(("fileheader" -> header)),
+      options = Map("fileheader" -> header, "sort_scope" -> "nosort"),
       isOverwriteTable = false,
       dataFrame = loadDF.select(tableCols.map(col): _*),
       updateModel = updateTableModel,
@@ -267,7 +267,7 @@ case class CarbonMergeDataSetCommand(
         StructField(status_on_mergeds, IntegerType)))
     val factory =
       new SparkCarbonFileFormat().prepareWrite(sparkSession, job,
-        
carbonTable.getTableInfo.getFactTable.getTableProperties.asScala.toMap, schema)
+        Map(), schema)
     val config = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, 
job.getConfiguration)
     
(frame.rdd.coalesce(DistributionUtil.getConfiguredExecutors(sparkSession.sparkContext)).
       mapPartitionsWithIndex { case (index, iter) =>
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
index c19a132..9246226 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -83,6 +83,57 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll 
{
     (dwSelframe, odsframe)
   }
 
+  private def initializeGloabalSort = {
+    val initframe = generateData(10)
+    initframe.write
+      .format("carbondata")
+      .option("tableName", "order")
+      .option("sort_scope", "global_sort")
+      .option("sort_columns", "id")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    val dwframe = sqlContext.read.format("carbondata").option("tableName", 
"order").load()
+    val dwSelframe = dwframe.as("A")
+
+    val odsframe = generateFullCDC(10, 2, 2, 1, 2).as("B")
+    (dwSelframe, odsframe)
+  }
+
+  private def initializeLocalSort = {
+    val initframe = generateData(10)
+    initframe.write
+      .format("carbondata")
+      .option("tableName", "order")
+      .option("sort_scope", "local_sort")
+      .option("sort_columns", "id")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    val dwframe = sqlContext.read.format("carbondata").option("tableName", 
"order").load()
+    val dwSelframe = dwframe.as("A")
+
+    val odsframe = generateFullCDC(10, 2, 2, 1, 2).as("B")
+    (dwSelframe, odsframe)
+  }
+
+  private def initializeNoSortWithSortColumns = {
+    val initframe = generateData(10)
+    initframe.write
+      .format("carbondata")
+      .option("tableName", "order")
+      .option("sort_scope", "no_sort")
+      .option("sort_columns", "id")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    val dwframe = sqlContext.read.format("carbondata").option("tableName", 
"order").load()
+    val dwSelframe = dwframe.as("A")
+
+    val odsframe = generateFullCDC(10, 2, 2, 1, 2).as("B")
+    (dwSelframe, odsframe)
+  }
+
   private def initializePartition = {
     val initframe = generateData(10)
     initframe.write
@@ -142,6 +193,54 @@ class MergeTestCase extends QueryTest with 
BeforeAndAfterAll {
     checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
   }
 
+  test("test basic merge into the globalsort table") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initializeGloabalSort
+
+    val updateMap = Map("id" -> "A.id",
+      "name" -> "B.name",
+      "c_name" -> "B.c_name",
+      "quantity" -> "B.quantity",
+      "price" -> "B.price",
+      "state" -> "B.state").asInstanceOf[Map[Any, Any]]
+
+    dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
+      col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
+    checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+  }
+
+  test("test basic merge into the localsort table") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initializeLocalSort
+
+    val updateMap = Map("id" -> "A.id",
+      "name" -> "B.name",
+      "c_name" -> "B.c_name",
+      "quantity" -> "B.quantity",
+      "price" -> "B.price",
+      "state" -> "B.state").asInstanceOf[Map[Any, Any]]
+
+    dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
+      col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
+    checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+  }
+
+  test("test basic merge into the nosort table with sortcolumns") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initializeNoSortWithSortColumns
+
+    val updateMap = Map("id" -> "A.id",
+      "name" -> "B.name",
+      "c_name" -> "B.c_name",
+      "quantity" -> "B.quantity",
+      "price" -> "B.price",
+      "state" -> "B.state").asInstanceOf[Map[Any, Any]]
+
+    dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
+      col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
+    checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+  }
+
   test("test basic merge update with few mappings with out condition") {
     sql("drop table if exists order")
     val (dwSelframe, odsframe) = initialize
@@ -459,12 +558,12 @@ class MergeTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists customers")
 
     val initframe =
-    sqlContext.sparkSession.createDataFrame(Seq(
-      Row(1, "old address for 1", false, null, Date.valueOf("2018-02-01")),
-      Row(1, "current address for 1", true, Date.valueOf("2018-02-01"), null),
-      Row(2, "current address for 2", true, Date.valueOf("2018-02-01"), null),
-      Row(3, "current address for 3", true, Date.valueOf("2018-02-01"), null)
-    ).asJava, StructType(Seq(StructField("customerId", IntegerType), 
StructField("address", StringType), StructField("current", BooleanType), 
StructField("effectiveDate", DateType), StructField("endDate", DateType))))
+      sqlContext.sparkSession.createDataFrame(Seq(
+        Row(1, "old address for 1", false, null, Date.valueOf("2018-02-01")),
+        Row(1, "current address for 1", true, Date.valueOf("2018-02-01"), 
null),
+        Row(2, "current address for 2", true, Date.valueOf("2018-02-01"), 
null),
+        Row(3, "current address for 3", true, Date.valueOf("2018-02-01"), null)
+      ).asJava, StructType(Seq(StructField("customerId", IntegerType), 
StructField("address", StringType), StructField("current", BooleanType), 
StructField("effectiveDate", DateType), StructField("endDate", DateType))))
     initframe.printSchema()
     initframe.write
       .format("carbondata")

Reply via email to