This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new d196fbd  [CARBONDATA-4019]Fix CDC merge failure join expression made 
of AND/OR expressions.
d196fbd is described below

commit d196fbdbadb376054afdcbf13181e78c5e599763
Author: akashrn5 <[email protected]>
AuthorDate: Mon Sep 28 20:28:49 2020 +0530

    [CARBONDATA-4019]Fix CDC merge failure join expression made of AND/OR 
expressions.
    
    Why is this PR needed?
    1. In CDC ,when the join expression contains AND/OR expression, then it 
failed with CAST exception
    2. when multiple columns are present in join condition, then not all 
columns are considered to check as
    bucket columns to repartition.
    What changes were proposed in this PR?
    1. Instead of directly casting to equalTo expression to get join column, 
collect all attributes
    2. check if all join columns present in bucket columns and then only 
repartition the data.
    
    This closes #3961
---
 .../mutation/merge/CarbonMergeDataSetCommand.scala | 24 +++++++++----
 .../spark/testsuite/merge/MergeTestCase.scala      | 40 ++++++++++++++++++++--
 2 files changed, 54 insertions(+), 10 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
index 2afdb1a..ec55ea8 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
@@ -106,18 +106,28 @@ case class CarbonMergeDataSetCommand(
     // decide join type based on match conditions
     val joinType = decideJoinType
 
-    val joinColumn = mergeMatches.joinExpr.expr.asInstanceOf[EqualTo].left
-      .asInstanceOf[UnresolvedAttribute].nameParts.tail.head
-    // repartition the srsDs, if the target has bucketing and the bucketing 
column and join column
-    // are same
+    val joinColumns = mergeMatches.joinExpr.expr.collect {
+      case unresolvedAttribute: UnresolvedAttribute if 
unresolvedAttribute.nameParts.nonEmpty =>
+        // Let's say the join condition will be something like A.id = B.id, 
then it will be an
+        // EqualTo expression, with left expression as 
UnresolvedAttribute(A.id) and right will
+        // be a Literal(B.id). Since we need the column name here, we can 
directly check the left
+        // which is UnresolvedAttribute. We take nameparts from 
UnresolvedAttribute which is an
+        // ArrayBuffer containing "A" and "id", since "id" is column name, we 
take
+        // nameparts.tail.head which gives us "id" column name.
+        unresolvedAttribute.nameParts.tail.head
+    }.distinct
+
+    // repartition the srsDs, if the target has bucketing and the bucketing 
columns contains join
+    // columns
     val repartitionedSrcDs =
       if (carbonTable.getBucketingInfo != null &&
           carbonTable.getBucketingInfo
             .getListOfColumns
             .asScala
-            .exists(_.getColumnName.equalsIgnoreCase(joinColumn))) {
-      srcDS.repartition(carbonTable.getBucketingInfo.getNumOfRanges, 
srcDS.col(joinColumn))
-    } else {
+            .map(_.getColumnName).containsSlice(joinColumns)) {
+        srcDS.repartition(carbonTable.getBucketingInfo.getNumOfRanges,
+          joinColumns.map(srcDS.col): _*)
+      } else {
       srcDS
     }
     // Add the getTupleId() udf to get the tuple id to generate delete delta.
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
index 56b98a3..829c815 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -91,8 +91,11 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll 
{
     (dwSelframe, odsframe)
   }
 
-  private def initializeWithBucketing = {
-    sql("create table order(id string, name string, c_name string, quantity 
int, price int, state int) stored as carbondata 
tblproperties('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='id')")
+  private def initializeWithBucketing(bucketingColumns: Seq[String]) = {
+    sql(s"create table order(id string, name string, c_name string, quantity 
int, price int, " +
+        s"state int) stored as carbondata tblproperties('BUCKET_NUMBER'='10', 
'BUCKET_COLUMNS'='${
+      bucketingColumns.mkString(",")
+    }')")
     initialize
   }
 
@@ -844,7 +847,7 @@ class MergeTestCase extends QueryTest with 
BeforeAndAfterAll {
 
   test("test merge update and insert with condition and expression and delete 
action with target table as bucketing") {
     sql("drop table if exists order")
-    val (dwSelframe, odsframe) = initializeWithBucketing
+    val (dwSelframe, odsframe) = initializeWithBucketing(Seq("id"))
 
     var matches = Seq.empty[MergeMatch]
     val updateMap = Map(col("id") -> col("A.id"),
@@ -872,6 +875,37 @@ class MergeTestCase extends QueryTest with 
BeforeAndAfterAll {
     checkAnswer(sql("select price from order where id = 'newid1'"), 
Seq(Row(7500)))
   }
 
+  test("test merge with target table as multiple bucketing columns and join 
columns") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initializeWithBucketing(Seq("id", "quantity"))
+
+    var matches = Seq.empty[MergeMatch]
+    val updateMap = Map(col("id") -> col("A.id"),
+      col("price") -> expr("B.price + 1"),
+      col("state") -> col("B.state"))
+
+    val insertMap = Map(col("id") -> col("B.id"),
+      col("name") -> col("B.name"),
+      col("c_name") -> col("B.c_name"),
+      col("quantity") -> col("B.quantity"),
+      col("price") -> expr("B.price * 100"),
+      col("state") -> col("B.state"))
+
+    matches ++= Seq(WhenMatched(Some(col("A.state") =!= 
col("B.state"))).addAction(UpdateAction(updateMap)))
+    matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
+    matches ++= 
Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()))
+
+    CarbonMergeDataSetCommand(dwSelframe,
+      odsframe,
+      
MergeDataSetMatches((col("A.id").equalTo(col("B.id"))).and(col("A.quantity").equalTo(col(
+        "B.quantity"))), matches.toList)).run(sqlContext.sparkSession)
+    assert(getDeleteDeltaFileCount("order", "0") == 1)
+    checkAnswer(sql("select count(*) from order where id like 'newid%'"), 
Seq(Row(2)))
+    checkAnswer(sql("select count(*) from order"), Seq(Row(10)))
+    checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+    checkAnswer(sql("select price from order where id = 'newid1'"), 
Seq(Row(7500)))
+  }
+
   case class Target (id: Int, value: String, remark: String, mdt: String)
   case class Change (id: Int, value: String, change_type: String, mdt: String)
   private val numInitialRows = 10

Reply via email to