marchpure commented on a change in pull request #3986:
URL: https://github.com/apache/carbondata/pull/3986#discussion_r505670979
##########
File path:
processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
##########
@@ -1210,6 +1198,39 @@ private static boolean
checkDeleteDeltaFilesInSeg(Segment seg,
return blockLists;
}
+ private static List<String> checkAndGetDeleteDeltaFilesInSeg(Segment seg,
+ SegmentUpdateStatusManager segmentUpdateStatusManager, int
numberDeltaFilesThreshold) {
+
+ List<String> blockLists = new ArrayList<>();
+
+ Map<String, List<CarbonFile>> blockAndDeleteDeltaFilesMap =
+ segmentUpdateStatusManager.getDeleteDeltaFilesForSegment(seg);
+
+ List<String> blockNameList =
+ segmentUpdateStatusManager.getBlockNameFromSegment(seg.getSegmentNo());
+
+ Set<String> uniqueBlocks = new HashSet<String>();
+ for (final String blockName : blockNameList) {
+
+ List<CarbonFile> deleteDeltaFiles =
blockAndDeleteDeltaFilesMap.get(blockName);
+
+ if (null != deleteDeltaFiles) {
+ for (CarbonFile blocks : deleteDeltaFiles) {
Review comment:
if (delteDeltaFiles.size < threshold) continue
##########
File path:
processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
##########
@@ -1039,22 +1039,10 @@ private static boolean
isSegmentValid(LoadMetadataDetails seg) {
if (CompactionType.IUD_DELETE_DELTA == compactionTypeIUD) {
int numberDeleteDeltaFilesThreshold =
CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction();
- List<Segment> deleteSegments = new ArrayList<>();
for (Segment seg : segments) {
- if (checkDeleteDeltaFilesInSeg(seg, segmentUpdateStatusManager,
Review comment:
remove checkDeleteDeltaFilesInSeg function
##########
File path:
core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
##########
@@ -455,6 +455,51 @@ public boolean accept(CarbonFile pathName) {
return null;
}
+ public Map<String, List<CarbonFile>> getDeleteDeltaFilesForSegment(final
Segment seg) {
Review comment:
remove getDeleteDeltaFilesList function
##########
File path:
processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
##########
@@ -1210,6 +1198,39 @@ private static boolean
checkDeleteDeltaFilesInSeg(Segment seg,
return blockLists;
}
+ private static List<String> checkAndGetDeleteDeltaFilesInSeg(Segment seg,
+ SegmentUpdateStatusManager segmentUpdateStatusManager, int
numberDeltaFilesThreshold) {
+
+ List<String> blockLists = new ArrayList<>();
+
+ Map<String, List<CarbonFile>> blockAndDeleteDeltaFilesMap =
+ segmentUpdateStatusManager.getDeleteDeltaFilesForSegment(seg);
+
+ List<String> blockNameList =
+ segmentUpdateStatusManager.getBlockNameFromSegment(seg.getSegmentNo());
+
+ Set<String> uniqueBlocks = new HashSet<String>();
+ for (final String blockName : blockNameList) {
+
+ List<CarbonFile> deleteDeltaFiles =
blockAndDeleteDeltaFilesMap.get(blockName);
+
+ if (null != deleteDeltaFiles) {
+ for (CarbonFile blocks : deleteDeltaFiles) {
Review comment:
blocks -> block
##########
File path:
core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
##########
@@ -455,6 +455,51 @@ public boolean accept(CarbonFile pathName) {
return null;
}
+ public Map<String, List<CarbonFile>> getDeleteDeltaFilesForSegment(final
Segment seg) {
+ String segmentPath = CarbonTablePath.getSegmentPath(
+ identifier.getTablePath(), seg.getSegmentNo());
+ CarbonFile segDir = FileFactory.getCarbonFile(segmentPath);
+ CarbonFile[] allDeleteDeltaFilesOfSegment = segDir.listFiles(new
CarbonFileFilter() {
+ @Override
+ public boolean accept(CarbonFile pathName) {
+ String fileName = pathName.getName();
+ return (pathName.getSize() > 0) &&
Review comment:
getSize() will trigger one S3 IO.
remove getsSize()
##########
File path:
core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
##########
@@ -455,6 +455,51 @@ public boolean accept(CarbonFile pathName) {
return null;
}
+ public Map<String, List<CarbonFile>> getDeleteDeltaFilesForSegment(final
Segment seg) {
+ String segmentPath = CarbonTablePath.getSegmentPath(
+ identifier.getTablePath(), seg.getSegmentNo());
Review comment:
if SegmentUpdateDetails donot contains seg, we shall return empty
result directly.
which can save a lot of IO overhead
##########
File path:
processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
##########
@@ -1210,6 +1198,39 @@ private static boolean
checkDeleteDeltaFilesInSeg(Segment seg,
return blockLists;
}
+ private static List<String> checkAndGetDeleteDeltaFilesInSeg(Segment seg,
+ SegmentUpdateStatusManager segmentUpdateStatusManager, int
numberDeltaFilesThreshold) {
+
+ List<String> blockLists = new ArrayList<>();
+
+ Map<String, List<CarbonFile>> blockAndDeleteDeltaFilesMap =
+ segmentUpdateStatusManager.getDeleteDeltaFilesForSegment(seg);
+
+ List<String> blockNameList =
+ segmentUpdateStatusManager.getBlockNameFromSegment(seg.getSegmentNo());
+
+ Set<String> uniqueBlocks = new HashSet<String>();
+ for (final String blockName : blockNameList) {
+
+ List<CarbonFile> deleteDeltaFiles =
blockAndDeleteDeltaFilesMap.get(blockName);
+
+ if (null != deleteDeltaFiles) {
+ for (CarbonFile blocks : deleteDeltaFiles) {
+ String task =
CarbonTablePath.DataFileUtil.getTaskNo(blocks.getName());
+ String timestamp =
+
CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName());
+ String taskAndTimeStamp = task + "-" + timestamp;
+ uniqueBlocks.add(taskAndTimeStamp);
Review comment:
if (uniqueBlocks.size() > numberDeltaFilesThreshold) {
blockLists.add(seg.getSegmentNo() + "/" + blockName);
break;
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]