This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 178aacf [CARBONDATA-3902] Fix CDC delete data issue on partition table
178aacf is described below
commit 178aacfbb06842c7fe3213c72855e570d1199f8c
Author: Indhumathi27 <[email protected]>
AuthorDate: Wed Jul 15 17:01:47 2020 +0530
[CARBONDATA-3902] Fix CDC delete data issue on partition table
Why is this PR needed?
When only delete operation is executed through cdc merge command on
partition table,
loadMetaEntry of segments which has deleted rows, is not updated with latest
tableupdatestatus filename. On query, will give wrong results.
What changes were proposed in this PR?
Update loadMetaEntry of segments which has deleted rows, with latest
tableupdatestatus
filename, after delete operation on partition table, if loadDF.count == 0
NOTE: If merge has data to be inserted after(U & D), CarbonInsertIntoDf
will update
the tableupdatestatus file name in load meta entry, during load.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #3846
---
.../mutation/merge/CarbonMergeDataSetCommand.scala | 20 ++++++-
.../spark/testsuite/merge/MergeTestCase.scala | 67 ++++++++++++++++++----
2 files changed, 74 insertions(+), 13 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
index 7abfc42..fb5b200 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
@@ -46,9 +46,12 @@ import org.apache.spark.util.{AccumulatorContext,
AccumulatorMetadata, LongAccum
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.{CarbonCommonConstants,
CarbonLoadOptionConstants}
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.loading.FailureCauses
/**
@@ -164,7 +167,7 @@ case class CarbonMergeDataSetCommand(
processedRDD)(sparkSession))
loadDF.cache()
- loadDF.count()
+ val count = loadDF.count()
val updateTableModel = if (FileFactory.isFileExist(deltaPath)) {
val deltaRdd = sparkSession.read.format("carbon").load(deltaPath).rdd
val tuple = mutationAction.handleAction(deltaRdd, executorErrors, trxMgr)
@@ -175,6 +178,21 @@ case class CarbonMergeDataSetCommand(
LOGGER.error("writing of update status file failed")
throw new CarbonMergeDataSetException("writing of update status file
failed")
}
+ if (carbonTable.isHivePartitionTable) {
+ // If load count is 0 and if merge action contains delete operation,
update
+ // tableUpdateStatus file name in loadMeta entry
+ if (count == 0 && hasDelAction && !tuple._1.isEmpty) {
+ val loadMetaDataDetails =
SegmentStatusManager.readTableStatusFile(CarbonTablePath
+ .getTableStatusFilePath(carbonTable.getTablePath))
+
CarbonUpdateUtil.updateTableMetadataStatus(loadMetaDataDetails.map(loadMetadataDetail
=>
+ new Segment(loadMetadataDetail.getMergedLoadName,
+ loadMetadataDetail.getSegmentFile)).toSet.asJava,
+ carbonTable,
+ trxMgr.getLatestTrx.toString,
+ true,
+ tuple._2.asJava)
+ }
+ }
Some(UpdateTableModel(true, trxMgr.getLatestTrx,
executorErrors, tuple._2, true))
} else {
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
index d068507..8c4725a 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -17,7 +17,6 @@
package org.apache.carbondata.spark.testsuite.merge
-import java.io.File
import java.sql.Date
import java.time.LocalDateTime
@@ -626,7 +625,7 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
}
- test("check the ccd with partition") {
+ test("check the cdc with partition") {
sql("drop table if exists target")
val initframe = sqlContext.sparkSession.createDataFrame(Seq(
@@ -643,7 +642,7 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
.mode(SaveMode.Overwrite)
.save()
val target = sqlContext.read.format("carbondata").option("tableName",
"target").load()
- var ccd =
+ var cdc =
sqlContext.sparkSession.createDataFrame(Seq(
Row("a", "10", false, 0),
Row("a", null, true, 1), // a was updated and then deleted
@@ -656,15 +655,15 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
StructType(Seq(StructField("key", StringType),
StructField("newValue", StringType),
StructField("deleted", BooleanType), StructField("time",
IntegerType))))
- ccd.createOrReplaceTempView("changes")
+ cdc.createOrReplaceTempView("changes")
- ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as
deleted FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM
changes GROUP BY key)")
+ cdc = sql("SELECT key, latest.newValue as newValue, latest.deleted as
deleted FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM
changes GROUP BY key)")
val updateMap = Map("key" -> "B.key", "value" ->
"B.newValue").asInstanceOf[Map[Any, Any]]
val insertMap = Map("key" -> "B.key", "value" ->
"B.newValue").asInstanceOf[Map[Any, Any]]
- target.as("A").merge(ccd.as("B"), "A.key=B.key").
+ target.as("A").merge(cdc.as("B"), "A.key=B.key").
whenMatched("B.deleted=false").
updateExpr(updateMap).
whenNotMatched("B.deleted=false").
@@ -676,7 +675,7 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
checkAnswer(sql("select * from target order by key"), Seq(Row("c", "200"),
Row("d", "3"), Row("e", "100")))
}
- test("check the ccd ") {
+ test("check the cdc ") {
sql("drop table if exists target")
val initframe = sqlContext.sparkSession.createDataFrame(Seq(
@@ -692,7 +691,7 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
.mode(SaveMode.Overwrite)
.save()
val target = sqlContext.read.format("carbondata").option("tableName",
"target").load()
- var ccd =
+ var cdc =
sqlContext.sparkSession.createDataFrame(Seq(
Row("a", "10", false, 0),
Row("a", null, true, 1), // a was updated and then deleted
@@ -705,15 +704,15 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
StructType(Seq(StructField("key", StringType),
StructField("newValue", StringType),
StructField("deleted", BooleanType), StructField("time",
IntegerType))))
- ccd.createOrReplaceTempView("changes")
+ cdc.createOrReplaceTempView("changes")
- ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as
deleted FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM
changes GROUP BY key)")
+ cdc = sql("SELECT key, latest.newValue as newValue, latest.deleted as
deleted FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM
changes GROUP BY key)")
val updateMap = Map("key" -> "B.key", "value" ->
"B.newValue").asInstanceOf[Map[Any, Any]]
val insertMap = Map("key" -> "B.key", "value" ->
"B.newValue").asInstanceOf[Map[Any, Any]]
- target.as("A").merge(ccd.as("B"), "A.key=B.key").
+ target.as("A").merge(cdc.as("B"), "A.key=B.key").
whenMatched("B.deleted=false").
updateExpr(updateMap).
whenNotMatched("B.deleted=false").
@@ -725,6 +724,47 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
checkAnswer(sql("select * from target order by key"), Seq(Row("c", "200"),
Row("d", "3"), Row("e", "100")))
}
+ test("check the cdc delete with partition") {
+ sql("drop table if exists target")
+
+ val initframe = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", "0"),
+ Row("a1", "0"),
+ Row("b", "1"),
+ Row("c", "2"),
+ Row("d", "3")
+ ).asJava, StructType(Seq(StructField("key", StringType),
StructField("value", StringType))))
+
+ initframe.write
+ .format("carbondata")
+ .option("tableName", "target")
+ .option("partitionColumns", "value")
+ .mode(SaveMode.Overwrite)
+ .save()
+ val target = sqlContext.read.format("carbondata").option("tableName",
"target").load()
+ var cdc =
+ sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", null, true, 1),
+ Row("a1", null, false, 1),
+ Row("b", null, true, 2),
+ Row("c", null, true, 3),
+ Row("e", "100", false, 6)
+ ).asJava,
+ StructType(Seq(StructField("key", StringType),
+ StructField("newValue", StringType),
+ StructField("deleted", BooleanType), StructField("time",
IntegerType))))
+ cdc.createOrReplaceTempView("changes")
+
+ cdc = sql("SELECT key, latest.newValue as newValue, latest.deleted as
deleted FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM
changes GROUP BY key)")
+
+ target.as("A").merge(cdc.as("B"), "A.key=B.key").
+ whenMatched("B.deleted=true").delete().execute()
+
+ assert(getDeleteDeltaFileCount("target", "0") == 1)
+ checkAnswer(sql("select count(*) from target"), Seq(Row(2)))
+ checkAnswer(sql("select * from target order by key"),
Seq(Row("a1","0"),Row("d", "3")))
+ }
+
case class Target (id: Int, value: String, remark: String, mdt: String)
case class Change (id: Int, value: String, change_type: String, mdt: String)
private val numInitialRows = 10
@@ -871,8 +911,11 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
private def getDeleteDeltaFileCount(tableName: String, segment: String): Int
= {
val table = CarbonEnv.getCarbonTable(None,
tableName)(sqlContext.sparkSession)
- val path = CarbonTablePath
+ var path = CarbonTablePath
.getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
+ if (table.isHivePartitionTable) {
+ path = table.getAbsoluteTableIdentifier.getTablePath
+ }
val deleteDeltaFiles = FileFactory.getCarbonFile(path).listFiles(true, new
CarbonFileFilter {
override def accept(file: CarbonFile): Boolean =
file.getName.endsWith(CarbonCommonConstants
.DELETE_DELTA_FILE_EXT)