This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a439ea0f449 [HUDI-6457] Keep JavaSizeBasedClusteringPlanStrategy and
SparkSizeBasedClusteringPlanStrategy aligned (#9099)
a439ea0f449 is described below
commit a439ea0f449fb334f0823323651ec1512f4cd5df
Author: ksmou <[email protected]>
AuthorDate: Fri Jun 30 19:39:31 2023 +0800
[HUDI-6457] Keep JavaSizeBasedClusteringPlanStrategy and
SparkSizeBasedClusteringPlanStrategy aligned (#9099)
---
.../JavaSizeBasedClusteringPlanStrategy.java | 53 +++++++++++++---------
1 file changed, 32 insertions(+), 21 deletions(-)
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
index fe66cedb133..d8f0c5fc804 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
@@ -60,41 +60,52 @@ public class JavaSizeBasedClusteringPlanStrategy<T>
@Override
protected Stream<HoodieClusteringGroup>
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice>
fileSlices) {
+ HoodieWriteConfig writeConfig = getWriteConfig();
+
List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
List<FileSlice> currentGroup = new ArrayList<>();
+
+ // Sort fileSlices before dividing, which makes dividing more compact
+ List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
+ sortedFileSlices.sort((o1, o2) -> (int)
+ ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize()
: writeConfig.getParquetMaxFileSize())
+ - (o1.getBaseFile().isPresent() ?
o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
+
long totalSizeSoFar = 0;
- HoodieWriteConfig writeConfig = getWriteConfig();
- for (FileSlice currentSlice : fileSlices) {
- // assume each filegroup size is ~= parquet.max.file.size
- totalSizeSoFar += currentSlice.getBaseFile().isPresent() ?
currentSlice.getBaseFile().get().getFileSize() :
writeConfig.getParquetMaxFileSize();
+
+ for (FileSlice currentSlice : sortedFileSlices) {
+ long currentSize = currentSlice.getBaseFile().isPresent() ?
currentSlice.getBaseFile().get().getFileSize() :
writeConfig.getParquetMaxFileSize();
// check if max size is reached and create new group, if needed.
- if (totalSizeSoFar >= writeConfig.getClusteringMaxBytesInGroup() &&
!currentGroup.isEmpty()) {
+ if (totalSizeSoFar + currentSize >
writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar,
writeConfig.getClusteringTargetFileMaxBytes());
LOG.info("Adding one clustering group " + totalSizeSoFar + " max
bytes: "
- + writeConfig.getClusteringMaxBytesInGroup() + " num input
slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
+ + writeConfig.getClusteringMaxBytesInGroup() + " num input slices:
" + currentGroup.size() + " output groups: " + numOutputGroups);
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
currentGroup = new ArrayList<>();
totalSizeSoFar = 0;
}
+
+ // Add to the current file-group
currentGroup.add(currentSlice);
- // totalSizeSoFar could be 0 when new group was created in the previous
conditional block.
- // reset to the size of current slice, otherwise the number of output
file group will become 0 even though current slice is present.
- if (totalSizeSoFar == 0) {
- totalSizeSoFar += currentSlice.getBaseFile().isPresent() ?
currentSlice.getBaseFile().get().getFileSize() :
writeConfig.getParquetMaxFileSize();
- }
+ // assume each file group size is ~= parquet.max.file.size
+ totalSizeSoFar += currentSize;
}
+
if (!currentGroup.isEmpty()) {
- int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar,
writeConfig.getClusteringTargetFileMaxBytes());
- LOG.info("Adding final clustering group " + totalSizeSoFar + " max
bytes: "
- + writeConfig.getClusteringMaxBytesInGroup() + " num input
slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
- fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+ if (currentGroup.size() > 1 ||
writeConfig.shouldClusteringSingleGroup()) {
+ int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar,
writeConfig.getClusteringTargetFileMaxBytes());
+ LOG.info("Adding final clustering group " + totalSizeSoFar + " max
bytes: "
+ + writeConfig.getClusteringMaxBytesInGroup() + " num input slices:
" + currentGroup.size() + " output groups: " + numOutputGroups);
+ fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+ }
}
-
- return fileSliceGroups.stream().map(fileSliceGroup ->
HoodieClusteringGroup.newBuilder()
- .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
- .setNumOutputFileGroups(fileSliceGroup.getRight())
- .setMetrics(buildMetrics(fileSliceGroup.getLeft()))
- .build());
+
+ return fileSliceGroups.stream().map(fileSliceGroup ->
+ HoodieClusteringGroup.newBuilder()
+ .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
+ .setNumOutputFileGroups(fileSliceGroup.getRight())
+ .setMetrics(buildMetrics(fileSliceGroup.getLeft()))
+ .build());
}
@Override