This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new d196fbd [CARBONDATA-4019]Fix CDC merge failure join expression made
of AND/OR expressions.
d196fbd is described below
commit d196fbdbadb376054afdcbf13181e78c5e599763
Author: akashrn5 <[email protected]>
AuthorDate: Mon Sep 28 20:28:49 2020 +0530
[CARBONDATA-4019]Fix CDC merge failure join expression made of AND/OR
expressions.
Why is this PR needed?
1. In CDC ,when the join expression contains AND/OR expression, then it
failed with CAST exception
2. when multiple columns are present in join condition, then not all
columns are considered to check as
bucket columns to repartition.
What changes were proposed in this PR?
1. Instead of directly casting to equalTo expression to get join column,
collect all attributes
2. check if all join columns present in bucket columns and then only
repartition the data.
This closes #3961
---
.../mutation/merge/CarbonMergeDataSetCommand.scala | 24 +++++++++----
.../spark/testsuite/merge/MergeTestCase.scala | 40 ++++++++++++++++++++--
2 files changed, 54 insertions(+), 10 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
index 2afdb1a..ec55ea8 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
@@ -106,18 +106,28 @@ case class CarbonMergeDataSetCommand(
// decide join type based on match conditions
val joinType = decideJoinType
- val joinColumn = mergeMatches.joinExpr.expr.asInstanceOf[EqualTo].left
- .asInstanceOf[UnresolvedAttribute].nameParts.tail.head
- // repartition the srsDs, if the target has bucketing and the bucketing
column and join column
- // are same
+ val joinColumns = mergeMatches.joinExpr.expr.collect {
+ case unresolvedAttribute: UnresolvedAttribute if
unresolvedAttribute.nameParts.nonEmpty =>
+ // Let's say the join condition will be something like A.id = B.id,
then it will be an
+ // EqualTo expression, with left expression as
UnresolvedAttribute(A.id) and right will
+ // be a Literal(B.id). Since we need the column name here, we can
directly check the left
+ // which is UnresolvedAttribute. We take nameparts from
UnresolvedAttribute which is an
+ // ArrayBuffer containing "A" and "id", since "id" is column name, we
take
+ // nameparts.tail.head which gives us "id" column name.
+ unresolvedAttribute.nameParts.tail.head
+ }.distinct
+
+ // repartition the srsDs, if the target has bucketing and the bucketing
columns contains join
+ // columns
val repartitionedSrcDs =
if (carbonTable.getBucketingInfo != null &&
carbonTable.getBucketingInfo
.getListOfColumns
.asScala
- .exists(_.getColumnName.equalsIgnoreCase(joinColumn))) {
- srcDS.repartition(carbonTable.getBucketingInfo.getNumOfRanges,
srcDS.col(joinColumn))
- } else {
+ .map(_.getColumnName).containsSlice(joinColumns)) {
+ srcDS.repartition(carbonTable.getBucketingInfo.getNumOfRanges,
+ joinColumns.map(srcDS.col): _*)
+ } else {
srcDS
}
// Add the getTupleId() udf to get the tuple id to generate delete delta.
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
index 56b98a3..829c815 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -91,8 +91,11 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll
{
(dwSelframe, odsframe)
}
- private def initializeWithBucketing = {
- sql("create table order(id string, name string, c_name string, quantity
int, price int, state int) stored as carbondata
tblproperties('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='id')")
+ private def initializeWithBucketing(bucketingColumns: Seq[String]) = {
+ sql(s"create table order(id string, name string, c_name string, quantity
int, price int, " +
+ s"state int) stored as carbondata tblproperties('BUCKET_NUMBER'='10',
'BUCKET_COLUMNS'='${
+ bucketingColumns.mkString(",")
+ }')")
initialize
}
@@ -844,7 +847,7 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
test("test merge update and insert with condition and expression and delete
action with target table as bucketing") {
sql("drop table if exists order")
- val (dwSelframe, odsframe) = initializeWithBucketing
+ val (dwSelframe, odsframe) = initializeWithBucketing(Seq("id"))
var matches = Seq.empty[MergeMatch]
val updateMap = Map(col("id") -> col("A.id"),
@@ -872,6 +875,37 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
checkAnswer(sql("select price from order where id = 'newid1'"),
Seq(Row(7500)))
}
+ test("test merge with target table as multiple bucketing columns and join
columns") {
+ sql("drop table if exists order")
+ val (dwSelframe, odsframe) = initializeWithBucketing(Seq("id", "quantity"))
+
+ var matches = Seq.empty[MergeMatch]
+ val updateMap = Map(col("id") -> col("A.id"),
+ col("price") -> expr("B.price + 1"),
+ col("state") -> col("B.state"))
+
+ val insertMap = Map(col("id") -> col("B.id"),
+ col("name") -> col("B.name"),
+ col("c_name") -> col("B.c_name"),
+ col("quantity") -> col("B.quantity"),
+ col("price") -> expr("B.price * 100"),
+ col("state") -> col("B.state"))
+
+ matches ++= Seq(WhenMatched(Some(col("A.state") =!=
col("B.state"))).addAction(UpdateAction(updateMap)))
+ matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
+ matches ++=
Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()))
+
+ CarbonMergeDataSetCommand(dwSelframe,
+ odsframe,
+
MergeDataSetMatches((col("A.id").equalTo(col("B.id"))).and(col("A.quantity").equalTo(col(
+ "B.quantity"))), matches.toList)).run(sqlContext.sparkSession)
+ assert(getDeleteDeltaFileCount("order", "0") == 1)
+ checkAnswer(sql("select count(*) from order where id like 'newid%'"),
Seq(Row(2)))
+ checkAnswer(sql("select count(*) from order"), Seq(Row(10)))
+ checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+ checkAnswer(sql("select price from order where id = 'newid1'"),
Seq(Row(7500)))
+ }
+
case class Target (id: Int, value: String, remark: String, mdt: String)
case class Change (id: Int, value: String, change_type: String, mdt: String)
private val numInitialRows = 10