This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 70fa1da  [CARBONDATA-4034] Improve the time-consuming of Horizontal 
Compaction for update
70fa1da is described below

commit 70fa1da59dd13a8e46565abfeddeaed096ed593d
Author: shenjiayu17 <[email protected]>
AuthorDate: Tue Oct 13 11:02:24 2020 +0800

    [CARBONDATA-4034] Improve the time-consuming of Horizontal Compaction for 
update
    
    Why is this PR needed?
    The horizontal compaction flow will be too slow when updating with lots of 
segments(or lots of blocks), so we try to analyze and optimize it for 
time-consuming problem.
    
    What changes were proposed in this PR?
    1. In performDeleteDeltaCompaction, optimize the method 
getSegListIUDCompactionQualified.
    2. Combine two traversals of segments which have same process.
    3. Use forming delete delta file name instead of listFiles operation.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3986
---
 .../statusmanager/SegmentUpdateStatusManager.java  |  59 ++++++------
 .../command/mutation/HorizontalCompaction.scala    |   6 ++
 .../iud/HorizontalCompactionTestCase.scala         |  94 +++++++++++++++++++
 .../processing/merger/CarbonDataMergerUtil.java    | 104 ++++++---------------
 4 files changed, 156 insertions(+), 107 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 8e12c2d..4bf8ba1 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -415,44 +415,39 @@ public class SegmentUpdateStatusManager {
   }
 
   /**
-   * Return all delta file for a block.
-   * @param segmentId
-   * @param blockName
-   * @return
+   * Get all delete delta files of the block of specified segment.
+   * Actually, delete delta file name is generated from each 
SegmentUpdateDetails.
+   *
+   * @param segment the segment which is to find block and its delete delta 
files
+   * @param blockName the specified block of the segment
+   * @return delete delta file list of the block
    */
-  public CarbonFile[] getDeleteDeltaFilesList(final Segment segmentId, final 
String blockName) {
+  public List<String> getDeleteDeltaFilesList(final Segment segment, final 
String blockName) {
+    List<String> deleteDeltaFileList = new ArrayList<>();
     String segmentPath = CarbonTablePath.getSegmentPath(
-        identifier.getTablePath(), segmentId.getSegmentNo());
-    CarbonFile segDir =
-        FileFactory.getCarbonFile(segmentPath);
+        identifier.getTablePath(), segment.getSegmentNo());
+
     for (SegmentUpdateDetails block : updateDetails) {
       if ((block.getBlockName().equalsIgnoreCase(blockName)) &&
-          (block.getSegmentName().equalsIgnoreCase(segmentId.getSegmentNo()))
-          && !CarbonUpdateUtil.isBlockInvalid((block.getSegmentStatus()))) {
-        final long deltaStartTimestamp =
-            
getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
-        final long deltaEndTimeStamp =
-            getEndTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, 
block);
-
-        return segDir.listFiles(new CarbonFileFilter() {
-
-          @Override
-          public boolean accept(CarbonFile pathName) {
-            String fileName = pathName.getName();
-            if (pathName.getSize() > 0
-                && 
fileName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)) {
-              String blkName = fileName.substring(0, 
fileName.lastIndexOf("-"));
-              long timestamp =
-                  
Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(fileName));
-              return blockName.equals(blkName) && timestamp <= 
deltaEndTimeStamp
-                  && timestamp >= deltaStartTimestamp;
-            }
-            return false;
-          }
-        });
+          (block.getSegmentName().equalsIgnoreCase(segment.getSegmentNo())) &&
+          !CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
+        Set<String> deltaFileTimestamps = block.getDeltaFileStamps();
+        if (deltaFileTimestamps != null && deltaFileTimestamps.size() > 0) {
+          deltaFileTimestamps.forEach(timestamp -> deleteDeltaFileList.add(
+              CarbonUpdateUtil.getDeleteDeltaFilePath(segmentPath, blockName, 
timestamp)));
+        } else {
+          // when the deltaFileTimestamps is null, then there is only one 
delta file
+          // and the SegmentUpdateDetails will have same start and end 
timestamp,
+          // just take one to form the delete delta file name
+          final long deltaEndTimeStamp =
+              
getEndTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
+          deleteDeltaFileList.add(CarbonUpdateUtil.getDeleteDeltaFilePath(
+              segmentPath, blockName, String.valueOf(deltaEndTimeStamp)));
+        }
+        return deleteDeltaFileList;
       }
     }
-    return null;
+    return deleteDeltaFileList;
   }
 
   /**
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index e99045b..747885e 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -130,6 +130,9 @@ object HorizontalCompaction {
       absTableIdentifier,
       segmentUpdateStatusManager,
       compactionTypeIUD)
+    if (LOG.isDebugEnabled) {
+      LOG.debug(s"The segment list for Horizontal Update Compaction is 
$validSegList")
+    }
 
     if (validSegList.size() == 0) {
       return
@@ -177,6 +180,9 @@ object HorizontalCompaction {
       absTableIdentifier,
       segmentUpdateStatusManager,
       compactionTypeIUD)
+    if (LOG.isDebugEnabled) {
+      LOG.debug(s"The segment list for Horizontal Update Compaction is 
$deletedBlocksList")
+    }
 
     if (deletedBlocksList.size() == 0) {
       return
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala
index 5852310..cfe3b17 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala
@@ -432,6 +432,100 @@ class HorizontalCompactionTestCase extends QueryTest with 
BeforeAndAfterAll {
       .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false")
   }
 
+  test("test IUD Horizontal Compaction after updating without compaction") {
+    CarbonProperties.getInstance().
+      addProperty(CarbonCommonConstants.CARBON_HORIZONTAL_COMPACTION_ENABLE, 
"false")
+    sql("""drop database if exists iud4 cascade""")
+    sql("""create database iud4""")
+    sql("""use iud4""")
+    sql("""create table dest4 (c1 string,c2 int,c3 string,c5 string) STORED AS 
carbondata""")
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table 
dest4""")
+    val carbonTable = CarbonEnv.getCarbonTable(Some("iud4"), 
"dest4")(sqlContext.sparkSession)
+    val identifier = carbonTable.getAbsoluteTableIdentifier()
+    val dataFilesDir = CarbonTablePath.getSegmentPath(identifier.getTablePath, 
"0")
+    val carbonFile = FileFactory.getCarbonFile(dataFilesDir)
+
+    // update different row in same file three times
+    sql("""update dest4 set (c1) = ('update_1') where c2 = 1""").collect()
+    sql("""update dest4 set (c1) = ('update_2') where c2 = 2""").collect()
+    sql("""update dest4 set (c1) = ('update_3') where c2 = 3""").collect()
+
+    // update without compaction three times, there should be three 
'.deletedelta' files
+    var deleteDeltaFiles = getDeltaFiles(carbonFile, 
CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+    assert(deleteDeltaFiles.length == 3)
+
+    CarbonProperties.getInstance().
+      addProperty(CarbonCommonConstants.CARBON_HORIZONTAL_COMPACTION_ENABLE, 
"true")
+
+    // update with compaction
+    sql("""update dest4 set (c1) = ('update_4') where c2 = 4""").collect()
+    sql("""update dest4 set (c1) = ('update_5') where c2 = 5""").collect()
+    sql("""update dest4 set (c1) = ('update_6') where c2 = 6""").collect()
+
+    checkAnswer(
+      sql("select c1 from dest4 order by c2"),
+      Seq(Row("update_1"),
+        Row("update_2"),
+        Row("update_3"),
+        Row("update_4"),
+        Row("update_5"),
+        Row("update_6"),
+        Row("g"),
+        Row("h"),
+        Row("i"),
+        Row("j"))
+    )
+    sql("""drop table dest4""")
+  }
+
+  test("test IUD Horizontal Compaction when 
CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION >= 3") {
+    CarbonProperties.getInstance().
+      
addProperty(CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION,
 "3")
+    sql("""drop database if exists iud4 cascade""")
+    sql("""create database iud4""")
+    sql("""use iud4""")
+    sql("""create table dest4 (c1 string,c2 int,c3 string,c5 string) STORED AS 
carbondata""")
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table 
dest4""")
+    val carbonTable = CarbonEnv.getCarbonTable(Some("iud4"), 
"dest4")(sqlContext.sparkSession)
+    val identifier = carbonTable.getAbsoluteTableIdentifier()
+    val dataFilesDir = CarbonTablePath.getSegmentPath(identifier.getTablePath, 
"0")
+    val carbonFile = FileFactory.getCarbonFile(dataFilesDir)
+
+    sql("""update dest4 set (c1) = ('update_1') where c2 = 1""").collect()
+    sql("""update dest4 set (c1) = ('update_2') where c2 = 2""").collect()
+    sql("""update dest4 set (c1) = ('update_3') where c2 = 3""").collect()
+
+    // at first three update, '.deletedelta' files count is less than or equal 
to threshold
+    // so there is no horizontal compaction
+    var deleteDeltaFiles = getDeltaFiles(carbonFile, 
CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+    assert(deleteDeltaFiles.length == 3)
+
+    sql("""update dest4 set (c1) = ('update_4') where c2 = 4""").collect()
+
+    // there is horizontal compaction at forth update
+    // three '.deletedelta' files for previous update operations
+    // one '.deletedelta' file for update operation this time
+    // one '.deletedelta' file for horizontal compaction
+    // so there must be five '.deletedelta' files
+    deleteDeltaFiles = getDeltaFiles(carbonFile, 
CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+    assert(deleteDeltaFiles.length == 5)
+
+    checkAnswer(
+      sql("select c1 from dest4 order by c2"),
+      Seq(Row("update_1"),
+        Row("update_2"),
+        Row("update_3"),
+        Row("update_4"),
+        Row("e"),
+        Row("f"),
+        Row("g"),
+        Row("h"),
+        Row("i"),
+        Row("j"))
+    )
+    sql("""drop table dest4""")
+  }
+
   override def afterAll {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true")
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index e873eef..f22a0fc 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -1039,22 +1039,10 @@ public final class CarbonDataMergerUtil {
     if (CompactionType.IUD_DELETE_DELTA == compactionTypeIUD) {
       int numberDeleteDeltaFilesThreshold =
           
CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction();
-      List<Segment> deleteSegments = new ArrayList<>();
       for (Segment seg : segments) {
-        if (checkDeleteDeltaFilesInSeg(seg, segmentUpdateStatusManager,
-            numberDeleteDeltaFilesThreshold)) {
-          deleteSegments.add(seg);
-        }
-      }
-      if (deleteSegments.size() > 0) {
-        // This Code Block Append the Segname along with the Blocks selected 
for Merge instead of
-        // only taking the segment name. This will help to parallelize better 
for each block
-        // in case of Delete Horizontal Compaction.
-        for (Segment segName : deleteSegments) {
-          List<String> tempSegments = getDeleteDeltaFilesInSeg(segName, 
segmentUpdateStatusManager,
-              numberDeleteDeltaFilesThreshold);
-          validSegments.addAll(tempSegments);
-        }
+        List<String> segmentNoAndBlocks = checkDeleteDeltaFilesInSeg(seg,
+            segmentUpdateStatusManager, numberDeleteDeltaFilesThreshold);
+        validSegments.addAll(segmentNoAndBlocks);
       }
     } else if (CompactionType.IUD_UPDDEL_DELTA == compactionTypeIUD) {
       int numberUpdateDeltaFilesThreshold =
@@ -1138,73 +1126,43 @@ public final class CarbonDataMergerUtil {
   }
 
   /**
-   * Check is the segment passed qualifies for IUD delete delta compaction or 
not i.e.
-   * if the number of delete delta files present in the segment is more than
-   * numberDeltaFilesThreshold.
+   * Check whether the segment passed qualifies for IUD delete delta 
compaction or not,
+   * i.e., if the number of delete delta files present in the segment is more 
than
+   * numberDeltaFilesThreshold, this segment will be selected.
    *
-   * @param seg
-   * @param segmentUpdateStatusManager
-   * @param numberDeltaFilesThreshold
-   * @return
+   * @param seg segment to be qualified
+   * @param segmentUpdateStatusManager segments & blocks details management
+   * @param numberDeltaFilesThreshold threshold of delete delta files
+   * @return block list of the segment
    */
-  private static boolean checkDeleteDeltaFilesInSeg(Segment seg,
+  private static List<String> checkDeleteDeltaFilesInSeg(Segment seg,
       SegmentUpdateStatusManager segmentUpdateStatusManager, int 
numberDeltaFilesThreshold) {
 
+    List<String> blockLists = new ArrayList<>();
     Set<String> uniqueBlocks = new HashSet<String>();
     List<String> blockNameList =
         segmentUpdateStatusManager.getBlockNameFromSegment(seg.getSegmentNo());
-
-    for (final String blockName : blockNameList) {
-
-      CarbonFile[] deleteDeltaFiles =
+    for (String blockName : blockNameList) {
+      List<String> deleteDeltaFiles =
           segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
-      if (null != deleteDeltaFiles) {
+      if (null != deleteDeltaFiles && deleteDeltaFiles.size() > 
numberDeltaFilesThreshold) {
         // The Delete Delta files may have Spill over blocks. Will consider 
multiple spill over
         // blocks as one. Currently DeleteDeltaFiles array contains Delete 
Delta Block name which
         // lies within Delete Delta Start TimeStamp and End TimeStamp. In 
order to eliminate
         // Spill Over Blocks will choose files with unique taskID.
-        for (CarbonFile blocks : deleteDeltaFiles) {
+        for (String deleteDeltaFile : deleteDeltaFiles) {
           // Get Task ID and the Timestamp from the Block name for e.g.
           // part-0-3-1481084721319.carbondata => "3-1481084721319"
-          String task = 
CarbonTablePath.DataFileUtil.getTaskNo(blocks.getName());
+          String task = 
CarbonTablePath.DataFileUtil.getTaskNo(deleteDeltaFile);
           String timestamp =
-              
CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName());
-          String taskAndTimeStamp = task + "-" + timestamp;
+              
CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(deleteDeltaFile);
+          String taskAndTimeStamp = task + CarbonCommonConstants.HYPHEN + 
timestamp;
           uniqueBlocks.add(taskAndTimeStamp);
+          if (uniqueBlocks.size() > numberDeltaFilesThreshold) {
+            blockLists.add(seg.getSegmentNo() + 
CarbonCommonConstants.FILE_SEPARATOR + blockName);
+            break;
+          }
         }
-
-        if (uniqueBlocks.size() > numberDeltaFilesThreshold) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-  /**
-   * Check is the segment passed qualifies for IUD delete delta compaction or 
not i.e.
-   * if the number of delete delta files present in the segment is more than
-   * numberDeltaFilesThreshold.
-   * @param seg
-   * @param segmentUpdateStatusManager
-   * @param numberDeltaFilesThreshold
-   * @return
-   */
-
-  private static List<String> getDeleteDeltaFilesInSeg(Segment seg,
-      SegmentUpdateStatusManager segmentUpdateStatusManager, int 
numberDeltaFilesThreshold) {
-
-    List<String> blockLists = new ArrayList<>();
-    List<String> blockNameList =
-        segmentUpdateStatusManager.getBlockNameFromSegment(seg.getSegmentNo());
-
-    for (final String blockName : blockNameList) {
-
-      CarbonFile[] deleteDeltaFiles =
-          segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
-
-      if (null != deleteDeltaFiles && (deleteDeltaFiles.length > 
numberDeltaFilesThreshold)) {
-        blockLists.add(seg.getSegmentNo() + "/" + blockName);
       }
     }
     return blockLists;
@@ -1246,20 +1204,16 @@ public final class CarbonDataMergerUtil {
     // set the update status.
     segmentUpdateStatusManager.setUpdateStatusDetails(segmentUpdateDetails);
 
-    CarbonFile[] deleteDeltaFiles =
+    List<String> deleteFilePathList =
         segmentUpdateStatusManager.getDeleteDeltaFilesList(new Segment(seg), 
blockName);
 
     String destFileName =
         blockName + "-" + timestamp.toString() + 
CarbonCommonConstants.DELETE_DELTA_FILE_EXT;
-    List<String> deleteFilePathList = new ArrayList<>();
-    if (null != deleteDeltaFiles && deleteDeltaFiles.length > 0 && null != 
deleteDeltaFiles[0]
-        .getParentFile()) {
-      String fullBlockFilePath = 
deleteDeltaFiles[0].getParentFile().getCanonicalPath()
-          + CarbonCommonConstants.FILE_SEPARATOR + destFileName;
-
-      for (CarbonFile cFile : deleteDeltaFiles) {
-        deleteFilePathList.add(cFile.getCanonicalPath());
-      }
+    if (deleteFilePathList.size() > 0) {
+      String deleteDeltaFilePath = deleteFilePathList.get(0);
+      String fullBlockFilePath = deleteDeltaFilePath.substring(0,
+          
deleteDeltaFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR)) +
+          CarbonCommonConstants.FILE_SEPARATOR + destFileName;
 
       CarbonDataMergerUtilResult blockDetails = new 
CarbonDataMergerUtilResult();
       blockDetails.setBlockName(blockName);

Reply via email to