This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b236a3  [CARBONDATA-3893] [IUD] Fix getting block name in compacted 
segment with dot for horizontal compaction delta files
3b236a3 is described below

commit 3b236a364c7feba311b56a82f89737b0416c05b9
Author: Zhangshunyu <zhangshunyu1...@126.com>
AuthorDate: Thu Jul 9 11:03:32 2020 +0800

    [CARBONDATA-3893] [IUD] Fix getting block name in compacted segment with 
dot for horizontal compaction delta files
    
    Why is this PR needed?
    Currentlly, during horizontal compaction, driver will list delta files in 
segment to merge them, during this comparasion it will use block name, current 
code only consider the blocks in normal segment, but not consider the compacted 
segement, for example, in XXXX-12.2-time.deletedelta the block name should be 
XXXX-12.2 not XXXX, if use XXX it will not list the delta files and horizontal 
compaction will not merge delta files.
    
    What changes were proposed in this PR?
    Correct the parse of block name
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3832
---
 .../carbondata/core/mutate/CarbonUpdateUtil.java   |  23 ++--
 .../statusmanager/SegmentUpdateStatusManager.java  |   7 +-
 .../spark/testsuite/merge/MergeTestCase.scala      | 152 ++++++++++++++++++++-
 3 files changed, 167 insertions(+), 15 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java 
b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index c717117..e915c66 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -471,6 +471,15 @@ public class CarbonUpdateUtil {
     LoadMetadataDetails[] details =
         SegmentStatusManager.readLoadMetadata(table.getMetadataPath());
 
+    SegmentUpdateStatusManager updateStatusManager = new 
SegmentUpdateStatusManager(table);
+    SegmentUpdateDetails[] segmentUpdateDetails = 
updateStatusManager.getUpdateStatusDetails();
+    // hold all the segments updated so that wen can check the delta files in 
them, ne need to
+    // check the others.
+    Set<String> updatedSegments = new HashSet<>();
+    for (SegmentUpdateDetails updateDetails : segmentUpdateDetails) {
+      updatedSegments.add(updateDetails.getSegmentName());
+    }
+
     String validUpdateStatusFile = "";
 
     boolean isAbortedFile = true;
@@ -479,22 +488,18 @@ public class CarbonUpdateUtil {
 
     List<Segment> segmentFilesToBeUpdated = new ArrayList<>();
 
+    // take the update status file name from 0th segment.
+    validUpdateStatusFile = ssm.getUpdateStatusFileName(details);
     // scan through each segment.
-
     for (LoadMetadataDetails segment : details) {
-
-      // take the update status file name from 0th segment.
-      validUpdateStatusFile = ssm.getUpdateStatusFileName(details);
-
       // if this segment is valid then only we will go for delta file deletion.
       // if the segment is mark for delete or compacted then any way it will 
get deleted.
-
       if (segment.getSegmentStatus() == SegmentStatus.SUCCESS
               || segment.getSegmentStatus() == 
SegmentStatus.LOAD_PARTIAL_SUCCESS) {
-
         // when there is no update operations done on table, then no need to 
go ahead. So
         // just check the update delta start timestamp and proceed if not empty
-        if (!segment.getUpdateDeltaStartTimestamp().isEmpty()) {
+        if (!segment.getUpdateDeltaStartTimestamp().isEmpty()
+                || updatedSegments.contains(segment.getLoadName())) {
           // take the list of files from this segment.
           String segmentPath = CarbonTablePath.getSegmentPath(
               table.getAbsoluteTableIdentifier().getTablePath(), 
segment.getLoadName());
@@ -503,8 +508,6 @@ public class CarbonUpdateUtil {
           CarbonFile[] allSegmentFiles = segDir.listFiles();
 
           // scan through the segment and find the carbondatafiles and index 
files.
-          SegmentUpdateStatusManager updateStatusManager = new 
SegmentUpdateStatusManager(table);
-
           boolean updateSegmentFile = false;
           // deleting of the aborted file scenario.
           if (deleteStaleCarbonDataFiles(segment, allSegmentFiles, 
updateStatusManager)) {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index cd83457..d547c3d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -448,10 +448,9 @@ public class SegmentUpdateStatusManager {
           @Override
           public boolean accept(CarbonFile pathName) {
             String fileName = pathName.getName();
-            if (fileName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
-                && pathName.getSize() > 0) {
-              String firstPart = fileName.substring(0, fileName.indexOf('.'));
-              String blkName = firstPart.substring(0, 
firstPart.lastIndexOf("-"));
+            if (pathName.getSize() > 0
+                && 
fileName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)) {
+              String blkName = fileName.substring(0, 
fileName.lastIndexOf("-"));
               long timestamp =
                   
Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(fileName));
               if (blockName.equals(blkName) && (Long.compare(timestamp, 
deltaEndTimeStamp) <= 0)
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
index 6ea702d..d068507 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -17,12 +17,16 @@
 
 package org.apache.carbondata.spark.testsuite.merge
 
-import scala.collection.JavaConverters._
+import java.io.File
 import java.sql.Date
+import java.time.LocalDateTime
 
+import scala.collection.JavaConverters._
+import scala.util.Random
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.spark.sql._
 import org.apache.spark.sql.CarbonSession._
@@ -721,6 +725,150 @@ class MergeTestCase extends QueryTest with 
BeforeAndAfterAll {
     checkAnswer(sql("select * from target order by key"), Seq(Row("c", "200"), 
Row("d", "3"), Row("e", "100")))
   }
 
+  case class Target (id: Int, value: String, remark: String, mdt: String)
+  case class Change (id: Int, value: String, change_type: String, mdt: String)
+  private val numInitialRows = 10
+  private val numInsertPerBatch = 2
+  private val numUpdatePerBatch = 5
+  private val numDeletePerBatch = 0
+  private val numBatch = 30
+  private val random = new Random()
+  private val values =
+    (1 to 100).map { x =>
+      random.nextString(10)
+    }
+  private def pickValue = values(random.nextInt(values.size))
+  private val currentIds = new java.util.ArrayList[Int](numInitialRows * 2)
+  private def getId(index: Int) = currentIds.get(index)
+  private def getAndRemoveId(index: Int) = currentIds.remove(index)
+  private def addId(id: Int) = currentIds.add(id)
+  private def removeId(index: Int) = currentIds.remove(index)
+  private def numOfIds = currentIds.size
+  private def maxId: Int = currentIds.asScala.max
+  private val INSERT = "I"
+  private val UPDATE = "U"
+  private val DELETE = "D"
+
+  private def generateRowsForInsert(sparkSession: SparkSession) = {
+    // data for insert to the target table
+    val insertRows = (maxId + 1 to maxId + numInsertPerBatch).map { x =>
+      addId(x)
+      Change(x, pickValue, INSERT, LocalDateTime.now().toString)
+    }
+    sparkSession.createDataFrame(insertRows)
+  }
+
+  private def generateRowsForDelete(sparkSession: SparkSession) = {
+    val deletedRows = (1 to numDeletePerBatch).map { x =>
+      val idIndex = random.nextInt(numOfIds)
+      Change(getAndRemoveId(idIndex), "", DELETE, LocalDateTime.now().toString)
+    }
+    sparkSession.createDataFrame(deletedRows)
+  }
+
+  private def generateRowsForUpdate(sparkSession: SparkSession) = {
+    val updatedRows = (1 to numUpdatePerBatch).map { x =>
+      val idIndex = random.nextInt(numOfIds)
+      Change(getId(idIndex), pickValue, UPDATE, LocalDateTime.now().toString)
+    }
+    sparkSession.createDataFrame(updatedRows)
+  }
+
+  // generate initial data for target table
+  private def generateTarget(sparkSession: SparkSession): Unit = {
+    val time = timeIt { () =>
+      val insertRows = (1 to numInitialRows).map { x =>
+        addId(x)
+        Target(x, pickValue, "origin", LocalDateTime.now().toString)
+      }
+      val targetData = sparkSession.createDataFrame(insertRows)
+      targetData.repartition(8)
+        .write
+        .format("carbondata")
+        .option("tableName", "target")
+        .option("sort_scope", "global_sort")
+        .option("sort_column", "id")
+        .mode(SaveMode.Overwrite)
+        .save()
+    }
+  }
+
+  // generate change data
+  private def generateChange(sparkSession: SparkSession): Unit = {
+    val update = generateRowsForUpdate(sparkSession)
+    val delete = generateRowsForDelete(sparkSession)
+    val insert = generateRowsForInsert(sparkSession)
+    // union them so that the change contains IUD
+    update
+      .union(delete)
+      .union(insert)
+      .repartition(8)
+      .write
+      .format("carbondata")
+      .option("tableName", "change")
+      .mode(SaveMode.Overwrite)
+      .save()
+  }
+
+  private def readTargetData(sparkSession: SparkSession): Dataset[Row] =
+    sparkSession.read
+      .format("carbondata")
+      .option("tableName", "target")
+      .load()
+
+  private def readChangeData(sparkSession: SparkSession): Dataset[Row] =
+    sparkSession.read
+      .format("carbondata")
+      .option("tableName", "change")
+      .load()
+
+  private def timeIt(func: () => Unit): Long = {
+    val start = System.nanoTime()
+    func()
+    System.nanoTime() - start
+  }
+
+  test("test cdc with compaction") {
+    
CarbonProperties.getInstance().addProperty("carbon.enable.auto.load.merge", 
"true")
+    sql("drop table if exists target")
+    sql("drop table if exists change")
+    // prepare target table
+    generateTarget(sqlContext.sparkSession)
+    // Do CDC for N batch
+    (1 to numBatch).foreach { i =>
+      // prepare for change data
+      generateChange(sqlContext.sparkSession)
+      // apply change to target table
+      import org.apache.spark.sql.CarbonSession._
+
+      // find the latest value for each key
+      val latestChangeForEachKey = readChangeData(sqlContext.sparkSession)
+        .selectExpr("id", "struct(mdt, value, change_type) as otherCols" )
+        .groupBy("id")
+        .agg(max("otherCols").as("latest"))
+        .selectExpr("id", "latest.*")
+
+      val target = readTargetData(sqlContext.sparkSession)
+      target.as("A")
+        .merge(latestChangeForEachKey.as("B"), "A.id = B.id")
+        .whenMatched("B.change_type = 'D'")
+        .delete()
+        .whenMatched("B.change_type = 'U'")
+        .updateExpr(
+          Map("id" -> "B.id", "value" -> "B.value", "remark" -> "'updated'", 
"mdt" -> "B.mdt"))
+        .whenNotMatched("B.change_type = 'I'")
+        .insertExpr(
+          Map("id" -> "B.id", "value" -> "B.value", "remark" -> "'new'", "mdt" 
-> "B.mdt"))
+        .execute()
+    }
+
+    sql("clean files for table target").collect()
+    assert(getDeleteDeltaFileCount("target", "12.2") == 1)
+    checkAnswer(sql("select count(*) from target"), Seq(Row(70)))
+
+    
CarbonProperties.getInstance().addProperty("carbon.enable.auto.load.merge", 
"false")
+  }
+
   private def getDeleteDeltaFileCount(tableName: String, segment: String): Int 
= {
     val table = CarbonEnv.getCarbonTable(None, 
tableName)(sqlContext.sparkSession)
     val path = CarbonTablePath
@@ -734,5 +882,7 @@ class MergeTestCase extends QueryTest with 
BeforeAndAfterAll {
 
   override def afterAll {
     sql("drop table if exists order")
+    sql("drop table if exists target")
+    sql("drop table if exists change")
   }
 }

Reply via email to