This is an automated email from the ASF dual-hosted git repository.
qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 70fa1da [CARBONDATA-4034] Improve the time-consuming of Horizontal
Compaction for update
70fa1da is described below
commit 70fa1da59dd13a8e46565abfeddeaed096ed593d
Author: shenjiayu17 <[email protected]>
AuthorDate: Tue Oct 13 11:02:24 2020 +0800
[CARBONDATA-4034] Improve the time-consuming of Horizontal Compaction for
update
Why is this PR needed?
The horizontal compaction flow will be too slow when updating with lots of
segments(or lots of blocks), so we try to analyze and optimize it for
time-consuming problem.
What changes were proposed in this PR?
1. In performDeleteDeltaCompaction, optimize the method
getSegListIUDCompactionQualified.
2. Combine two traversals of segments which have same process.
3. Use forming delete delta file name instead of listFiles operation.
Does this PR introduce any user interface change?
No
Is any new testcase added?
No
This closes #3986
---
.../statusmanager/SegmentUpdateStatusManager.java | 59 ++++++------
.../command/mutation/HorizontalCompaction.scala | 6 ++
.../iud/HorizontalCompactionTestCase.scala | 94 +++++++++++++++++++
.../processing/merger/CarbonDataMergerUtil.java | 104 ++++++---------------
4 files changed, 156 insertions(+), 107 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 8e12c2d..4bf8ba1 100644
---
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -415,44 +415,39 @@ public class SegmentUpdateStatusManager {
}
/**
- * Return all delta file for a block.
- * @param segmentId
- * @param blockName
- * @return
+ * Get all delete delta files of the block of specified segment.
+ * Actually, delete delta file name is generated from each
SegmentUpdateDetails.
+ *
+ * @param segment the segment which is to find block and its delete delta
files
+ * @param blockName the specified block of the segment
+ * @return delete delta file list of the block
*/
- public CarbonFile[] getDeleteDeltaFilesList(final Segment segmentId, final
String blockName) {
+ public List<String> getDeleteDeltaFilesList(final Segment segment, final
String blockName) {
+ List<String> deleteDeltaFileList = new ArrayList<>();
String segmentPath = CarbonTablePath.getSegmentPath(
- identifier.getTablePath(), segmentId.getSegmentNo());
- CarbonFile segDir =
- FileFactory.getCarbonFile(segmentPath);
+ identifier.getTablePath(), segment.getSegmentNo());
+
for (SegmentUpdateDetails block : updateDetails) {
if ((block.getBlockName().equalsIgnoreCase(blockName)) &&
- (block.getSegmentName().equalsIgnoreCase(segmentId.getSegmentNo()))
- && !CarbonUpdateUtil.isBlockInvalid((block.getSegmentStatus()))) {
- final long deltaStartTimestamp =
-
getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
- final long deltaEndTimeStamp =
- getEndTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT,
block);
-
- return segDir.listFiles(new CarbonFileFilter() {
-
- @Override
- public boolean accept(CarbonFile pathName) {
- String fileName = pathName.getName();
- if (pathName.getSize() > 0
- &&
fileName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)) {
- String blkName = fileName.substring(0,
fileName.lastIndexOf("-"));
- long timestamp =
-
Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(fileName));
- return blockName.equals(blkName) && timestamp <=
deltaEndTimeStamp
- && timestamp >= deltaStartTimestamp;
- }
- return false;
- }
- });
+ (block.getSegmentName().equalsIgnoreCase(segment.getSegmentNo())) &&
+ !CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
+ Set<String> deltaFileTimestamps = block.getDeltaFileStamps();
+ if (deltaFileTimestamps != null && deltaFileTimestamps.size() > 0) {
+ deltaFileTimestamps.forEach(timestamp -> deleteDeltaFileList.add(
+ CarbonUpdateUtil.getDeleteDeltaFilePath(segmentPath, blockName,
timestamp)));
+ } else {
+ // when the deltaFileTimestamps is null, then there is only one
delta file
+ // and the SegmentUpdateDetails will have same start and end
timestamp,
+ // just take one to form the delete delta file name
+ final long deltaEndTimeStamp =
+
getEndTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
+ deleteDeltaFileList.add(CarbonUpdateUtil.getDeleteDeltaFilePath(
+ segmentPath, blockName, String.valueOf(deltaEndTimeStamp)));
+ }
+ return deleteDeltaFileList;
}
}
- return null;
+ return deleteDeltaFileList;
}
/**
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index e99045b..747885e 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -130,6 +130,9 @@ object HorizontalCompaction {
absTableIdentifier,
segmentUpdateStatusManager,
compactionTypeIUD)
+ if (LOG.isDebugEnabled) {
+ LOG.debug(s"The segment list for Horizontal Update Compaction is
$validSegList")
+ }
if (validSegList.size() == 0) {
return
@@ -177,6 +180,9 @@ object HorizontalCompaction {
absTableIdentifier,
segmentUpdateStatusManager,
compactionTypeIUD)
+ if (LOG.isDebugEnabled) {
+ LOG.debug(s"The segment list for Horizontal Update Compaction is
$deletedBlocksList")
+ }
if (deletedBlocksList.size() == 0) {
return
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala
index 5852310..cfe3b17 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala
@@ -432,6 +432,100 @@ class HorizontalCompactionTestCase extends QueryTest with
BeforeAndAfterAll {
.addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false")
}
+ test("test IUD Horizontal Compaction after updating without compaction") {
+ CarbonProperties.getInstance().
+ addProperty(CarbonCommonConstants.CARBON_HORIZONTAL_COMPACTION_ENABLE,
"false")
+ sql("""drop database if exists iud4 cascade""")
+ sql("""create database iud4""")
+ sql("""use iud4""")
+ sql("""create table dest4 (c1 string,c2 int,c3 string,c5 string) STORED AS
carbondata""")
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table
dest4""")
+ val carbonTable = CarbonEnv.getCarbonTable(Some("iud4"),
"dest4")(sqlContext.sparkSession)
+ val identifier = carbonTable.getAbsoluteTableIdentifier()
+ val dataFilesDir = CarbonTablePath.getSegmentPath(identifier.getTablePath,
"0")
+ val carbonFile = FileFactory.getCarbonFile(dataFilesDir)
+
+ // update different row in same file three times
+ sql("""update dest4 set (c1) = ('update_1') where c2 = 1""").collect()
+ sql("""update dest4 set (c1) = ('update_2') where c2 = 2""").collect()
+ sql("""update dest4 set (c1) = ('update_3') where c2 = 3""").collect()
+
+ // update without compaction three times, there should be three
'.deletedelta' files
+ var deleteDeltaFiles = getDeltaFiles(carbonFile,
CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+ assert(deleteDeltaFiles.length == 3)
+
+ CarbonProperties.getInstance().
+ addProperty(CarbonCommonConstants.CARBON_HORIZONTAL_COMPACTION_ENABLE,
"true")
+
+ // update with compaction
+ sql("""update dest4 set (c1) = ('update_4') where c2 = 4""").collect()
+ sql("""update dest4 set (c1) = ('update_5') where c2 = 5""").collect()
+ sql("""update dest4 set (c1) = ('update_6') where c2 = 6""").collect()
+
+ checkAnswer(
+ sql("select c1 from dest4 order by c2"),
+ Seq(Row("update_1"),
+ Row("update_2"),
+ Row("update_3"),
+ Row("update_4"),
+ Row("update_5"),
+ Row("update_6"),
+ Row("g"),
+ Row("h"),
+ Row("i"),
+ Row("j"))
+ )
+ sql("""drop table dest4""")
+ }
+
+ test("test IUD Horizontal Compaction when
CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION >= 3") {
+ CarbonProperties.getInstance().
+
addProperty(CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION,
"3")
+ sql("""drop database if exists iud4 cascade""")
+ sql("""create database iud4""")
+ sql("""use iud4""")
+ sql("""create table dest4 (c1 string,c2 int,c3 string,c5 string) STORED AS
carbondata""")
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table
dest4""")
+ val carbonTable = CarbonEnv.getCarbonTable(Some("iud4"),
"dest4")(sqlContext.sparkSession)
+ val identifier = carbonTable.getAbsoluteTableIdentifier()
+ val dataFilesDir = CarbonTablePath.getSegmentPath(identifier.getTablePath,
"0")
+ val carbonFile = FileFactory.getCarbonFile(dataFilesDir)
+
+ sql("""update dest4 set (c1) = ('update_1') where c2 = 1""").collect()
+ sql("""update dest4 set (c1) = ('update_2') where c2 = 2""").collect()
+ sql("""update dest4 set (c1) = ('update_3') where c2 = 3""").collect()
+
+ // at first three update, '.deletedelta' files count is less than or equal
to threshold
+ // so there is no horizontal compaction
+ var deleteDeltaFiles = getDeltaFiles(carbonFile,
CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+ assert(deleteDeltaFiles.length == 3)
+
+ sql("""update dest4 set (c1) = ('update_4') where c2 = 4""").collect()
+
+ // there is horizontal compaction at forth update
+ // three '.deletedelta' files for previous update operations
+ // one '.deletedelta' file for update operation this time
+ // one '.deletedelta' file for horizontal compaction
+ // so there must be five '.deletedelta' files
+ deleteDeltaFiles = getDeltaFiles(carbonFile,
CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+ assert(deleteDeltaFiles.length == 5)
+
+ checkAnswer(
+ sql("select c1 from dest4 order by c2"),
+ Seq(Row("update_1"),
+ Row("update_2"),
+ Row("update_3"),
+ Row("update_4"),
+ Row("e"),
+ Row("f"),
+ Row("g"),
+ Row("h"),
+ Row("i"),
+ Row("j"))
+ )
+ sql("""drop table dest4""")
+ }
+
override def afterAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true")
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index e873eef..f22a0fc 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -1039,22 +1039,10 @@ public final class CarbonDataMergerUtil {
if (CompactionType.IUD_DELETE_DELTA == compactionTypeIUD) {
int numberDeleteDeltaFilesThreshold =
CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction();
- List<Segment> deleteSegments = new ArrayList<>();
for (Segment seg : segments) {
- if (checkDeleteDeltaFilesInSeg(seg, segmentUpdateStatusManager,
- numberDeleteDeltaFilesThreshold)) {
- deleteSegments.add(seg);
- }
- }
- if (deleteSegments.size() > 0) {
- // This Code Block Append the Segname along with the Blocks selected
for Merge instead of
- // only taking the segment name. This will help to parallelize better
for each block
- // in case of Delete Horizontal Compaction.
- for (Segment segName : deleteSegments) {
- List<String> tempSegments = getDeleteDeltaFilesInSeg(segName,
segmentUpdateStatusManager,
- numberDeleteDeltaFilesThreshold);
- validSegments.addAll(tempSegments);
- }
+ List<String> segmentNoAndBlocks = checkDeleteDeltaFilesInSeg(seg,
+ segmentUpdateStatusManager, numberDeleteDeltaFilesThreshold);
+ validSegments.addAll(segmentNoAndBlocks);
}
} else if (CompactionType.IUD_UPDDEL_DELTA == compactionTypeIUD) {
int numberUpdateDeltaFilesThreshold =
@@ -1138,73 +1126,43 @@ public final class CarbonDataMergerUtil {
}
/**
- * Check is the segment passed qualifies for IUD delete delta compaction or
not i.e.
- * if the number of delete delta files present in the segment is more than
- * numberDeltaFilesThreshold.
+ * Check whether the segment passed qualifies for IUD delete delta
compaction or not,
+ * i.e., if the number of delete delta files present in the segment is more
than
+ * numberDeltaFilesThreshold, this segment will be selected.
*
- * @param seg
- * @param segmentUpdateStatusManager
- * @param numberDeltaFilesThreshold
- * @return
+ * @param seg segment to be qualified
+ * @param segmentUpdateStatusManager segments & blocks details management
+ * @param numberDeltaFilesThreshold threshold of delete delta files
+ * @return block list of the segment
*/
- private static boolean checkDeleteDeltaFilesInSeg(Segment seg,
+ private static List<String> checkDeleteDeltaFilesInSeg(Segment seg,
SegmentUpdateStatusManager segmentUpdateStatusManager, int
numberDeltaFilesThreshold) {
+ List<String> blockLists = new ArrayList<>();
Set<String> uniqueBlocks = new HashSet<String>();
List<String> blockNameList =
segmentUpdateStatusManager.getBlockNameFromSegment(seg.getSegmentNo());
-
- for (final String blockName : blockNameList) {
-
- CarbonFile[] deleteDeltaFiles =
+ for (String blockName : blockNameList) {
+ List<String> deleteDeltaFiles =
segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
- if (null != deleteDeltaFiles) {
+ if (null != deleteDeltaFiles && deleteDeltaFiles.size() >
numberDeltaFilesThreshold) {
// The Delete Delta files may have Spill over blocks. Will consider
multiple spill over
// blocks as one. Currently DeleteDeltaFiles array contains Delete
Delta Block name which
// lies within Delete Delta Start TimeStamp and End TimeStamp. In
order to eliminate
// Spill Over Blocks will choose files with unique taskID.
- for (CarbonFile blocks : deleteDeltaFiles) {
+ for (String deleteDeltaFile : deleteDeltaFiles) {
// Get Task ID and the Timestamp from the Block name for e.g.
// part-0-3-1481084721319.carbondata => "3-1481084721319"
- String task =
CarbonTablePath.DataFileUtil.getTaskNo(blocks.getName());
+ String task =
CarbonTablePath.DataFileUtil.getTaskNo(deleteDeltaFile);
String timestamp =
-
CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName());
- String taskAndTimeStamp = task + "-" + timestamp;
+
CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(deleteDeltaFile);
+ String taskAndTimeStamp = task + CarbonCommonConstants.HYPHEN +
timestamp;
uniqueBlocks.add(taskAndTimeStamp);
+ if (uniqueBlocks.size() > numberDeltaFilesThreshold) {
+ blockLists.add(seg.getSegmentNo() +
CarbonCommonConstants.FILE_SEPARATOR + blockName);
+ break;
+ }
}
-
- if (uniqueBlocks.size() > numberDeltaFilesThreshold) {
- return true;
- }
- }
- }
- return false;
- }
-
- /**
- * Check is the segment passed qualifies for IUD delete delta compaction or
not i.e.
- * if the number of delete delta files present in the segment is more than
- * numberDeltaFilesThreshold.
- * @param seg
- * @param segmentUpdateStatusManager
- * @param numberDeltaFilesThreshold
- * @return
- */
-
- private static List<String> getDeleteDeltaFilesInSeg(Segment seg,
- SegmentUpdateStatusManager segmentUpdateStatusManager, int
numberDeltaFilesThreshold) {
-
- List<String> blockLists = new ArrayList<>();
- List<String> blockNameList =
- segmentUpdateStatusManager.getBlockNameFromSegment(seg.getSegmentNo());
-
- for (final String blockName : blockNameList) {
-
- CarbonFile[] deleteDeltaFiles =
- segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
-
- if (null != deleteDeltaFiles && (deleteDeltaFiles.length >
numberDeltaFilesThreshold)) {
- blockLists.add(seg.getSegmentNo() + "/" + blockName);
}
}
return blockLists;
@@ -1246,20 +1204,16 @@ public final class CarbonDataMergerUtil {
// set the update status.
segmentUpdateStatusManager.setUpdateStatusDetails(segmentUpdateDetails);
- CarbonFile[] deleteDeltaFiles =
+ List<String> deleteFilePathList =
segmentUpdateStatusManager.getDeleteDeltaFilesList(new Segment(seg),
blockName);
String destFileName =
blockName + "-" + timestamp.toString() +
CarbonCommonConstants.DELETE_DELTA_FILE_EXT;
- List<String> deleteFilePathList = new ArrayList<>();
- if (null != deleteDeltaFiles && deleteDeltaFiles.length > 0 && null !=
deleteDeltaFiles[0]
- .getParentFile()) {
- String fullBlockFilePath =
deleteDeltaFiles[0].getParentFile().getCanonicalPath()
- + CarbonCommonConstants.FILE_SEPARATOR + destFileName;
-
- for (CarbonFile cFile : deleteDeltaFiles) {
- deleteFilePathList.add(cFile.getCanonicalPath());
- }
+ if (deleteFilePathList.size() > 0) {
+ String deleteDeltaFilePath = deleteFilePathList.get(0);
+ String fullBlockFilePath = deleteDeltaFilePath.substring(0,
+
deleteDeltaFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR)) +
+ CarbonCommonConstants.FILE_SEPARATOR + destFileName;
CarbonDataMergerUtilResult blockDetails = new
CarbonDataMergerUtilResult();
blockDetails.setBlockName(blockName);