This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f3b7427a597 [MINOR] Code refactor about ClusteringPlanStrategy (#9128)
f3b7427a597 is described below
commit f3b7427a597aa1ae8618ae96bc87ac25f92d16c3
Author: ksmou <[email protected]>
AuthorDate: Thu Jul 6 10:05:11 2023 +0800
[MINOR] Code refactor about ClusteringPlanStrategy (#9128)
---
.../PartitionAwareClusteringPlanStrategy.java | 56 +++++++++++++++++++++-
.../FlinkSizeBasedClusteringPlanStrategy.java | 53 +-------------------
.../JavaSizeBasedClusteringPlanStrategy.java | 53 +-------------------
.../SparkSizeBasedClusteringPlanStrategy.java | 53 +-------------------
4 files changed, 57 insertions(+), 158 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 5b6ee9075bd..0401672dfb9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilter;
@@ -34,6 +35,7 @@ import
org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
@@ -53,8 +55,54 @@ public abstract class
PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus
/**
* Create Clustering group based on files eligible for clustering in the
partition.
*/
- protected abstract Stream<HoodieClusteringGroup>
buildClusteringGroupsForPartition(String partitionPath,
-
List<FileSlice> fileSlices);
+ protected Stream<HoodieClusteringGroup>
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice>
fileSlices) {
+ HoodieWriteConfig writeConfig = getWriteConfig();
+
+ List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
+ List<FileSlice> currentGroup = new ArrayList<>();
+
+ // Sort fileSlices before dividing, which makes dividing more compact
+ List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
+ sortedFileSlices.sort((o1, o2) -> (int)
+ ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize()
: writeConfig.getParquetMaxFileSize())
+ - (o1.getBaseFile().isPresent() ?
o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
+
+ long totalSizeSoFar = 0;
+
+ for (FileSlice currentSlice : sortedFileSlices) {
+ long currentSize = currentSlice.getBaseFile().isPresent() ?
currentSlice.getBaseFile().get().getFileSize() :
writeConfig.getParquetMaxFileSize();
+ // check if max size is reached and create new group, if needed.
+ if (totalSizeSoFar + currentSize >
writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
+ int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar,
writeConfig.getClusteringTargetFileMaxBytes());
+ LOG.info("Adding one clustering group " + totalSizeSoFar + " max
bytes: "
+ + writeConfig.getClusteringMaxBytesInGroup() + " num input slices:
" + currentGroup.size() + " output groups: " + numOutputGroups);
+ fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+ currentGroup = new ArrayList<>();
+ totalSizeSoFar = 0;
+ }
+
+ // Add to the current file-group
+ currentGroup.add(currentSlice);
+ // assume each file group size is ~= parquet.max.file.size
+ totalSizeSoFar += currentSize;
+ }
+
+ if (!currentGroup.isEmpty()) {
+ if (currentGroup.size() > 1 ||
writeConfig.shouldClusteringSingleGroup()) {
+ int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar,
writeConfig.getClusteringTargetFileMaxBytes());
+ LOG.info("Adding final clustering group " + totalSizeSoFar + " max
bytes: "
+ + writeConfig.getClusteringMaxBytesInGroup() + " num input slices:
" + currentGroup.size() + " output groups: " + numOutputGroups);
+ fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+ }
+ }
+
+ return fileSliceGroups.stream().map(fileSliceGroup ->
+ HoodieClusteringGroup.newBuilder()
+ .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
+ .setNumOutputFileGroups(fileSliceGroup.getRight())
+ .setMetrics(buildMetrics(fileSliceGroup.getLeft()))
+ .build());
+ }
/**
* Return list of partition paths to be considered for clustering.
@@ -135,4 +183,8 @@ public abstract class
PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus
}
return partitionPaths;
}
+
+ protected int getNumberOfOutputFileGroups(long groupSize, long
targetFileSize) {
+ return (int) Math.ceil(groupSize / (double) targetFileSize);
+ }
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
index 57734acd695..a232bf54c16 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
@@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
@@ -34,7 +33,6 @@ import
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPla
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -59,52 +57,7 @@ public class FlinkSizeBasedClusteringPlanStrategy<T>
@Override
protected Stream<HoodieClusteringGroup>
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice>
fileSlices) {
- HoodieWriteConfig writeConfig = getWriteConfig();
-
- List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
- List<FileSlice> currentGroup = new ArrayList<>();
-
- // Sort fileSlices before dividing, which makes dividing more compact
- List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
- sortedFileSlices.sort((o1, o2) -> (int)
- ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize()
: writeConfig.getParquetMaxFileSize())
- - (o1.getBaseFile().isPresent() ?
o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
-
- long totalSizeSoFar = 0;
-
- for (FileSlice currentSlice : sortedFileSlices) {
- long currentSize = currentSlice.getBaseFile().isPresent() ?
currentSlice.getBaseFile().get().getFileSize() :
writeConfig.getParquetMaxFileSize();
- // check if max size is reached and create new group, if needed.
- if (totalSizeSoFar + currentSize >
writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
- int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar,
writeConfig.getClusteringTargetFileMaxBytes());
- LOG.info("Adding one clustering group " + totalSizeSoFar + " max
bytes: "
- + writeConfig.getClusteringMaxBytesInGroup() + " num input slices:
" + currentGroup.size() + " output groups: " + numOutputGroups);
- fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
- currentGroup = new ArrayList<>();
- totalSizeSoFar = 0;
- }
-
- // Add to the current file-group
- currentGroup.add(currentSlice);
- // assume each file group size is ~= parquet.max.file.size
- totalSizeSoFar += currentSize;
- }
-
- if (!currentGroup.isEmpty()) {
- if (currentGroup.size() > 1 ||
writeConfig.shouldClusteringSingleGroup()) {
- int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar,
writeConfig.getClusteringTargetFileMaxBytes());
- LOG.info("Adding final clustering group " + totalSizeSoFar + " max
bytes: "
- + writeConfig.getClusteringMaxBytesInGroup() + " num input slices:
" + currentGroup.size() + " output groups: " + numOutputGroups);
- fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
- }
- }
-
- return fileSliceGroups.stream().map(fileSliceGroup ->
- HoodieClusteringGroup.newBuilder()
- .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
- .setNumOutputFileGroups(fileSliceGroup.getRight())
- .setMetrics(buildMetrics(fileSliceGroup.getLeft()))
- .build());
+ return super.buildClusteringGroupsForPartition(partitionPath, fileSlices);
}
@Override
@@ -122,8 +75,4 @@ public class FlinkSizeBasedClusteringPlanStrategy<T>
// Only files that have base file size smaller than small file size
are eligible.
.filter(slice ->
slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) <
getWriteConfig().getClusteringSmallFileLimit());
}
-
- private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize)
{
- return (int) Math.ceil(groupSize / (double) targetFileSize);
- }
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
index d8f0c5fc804..03d416452d2 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
@@ -27,7 +27,6 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
@@ -35,7 +34,6 @@ import
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPla
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -60,52 +58,7 @@ public class JavaSizeBasedClusteringPlanStrategy<T>
@Override
protected Stream<HoodieClusteringGroup>
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice>
fileSlices) {
- HoodieWriteConfig writeConfig = getWriteConfig();
-
- List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
- List<FileSlice> currentGroup = new ArrayList<>();
-
- // Sort fileSlices before dividing, which makes dividing more compact
- List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
- sortedFileSlices.sort((o1, o2) -> (int)
- ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize()
: writeConfig.getParquetMaxFileSize())
- - (o1.getBaseFile().isPresent() ?
o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
-
- long totalSizeSoFar = 0;
-
- for (FileSlice currentSlice : sortedFileSlices) {
- long currentSize = currentSlice.getBaseFile().isPresent() ?
currentSlice.getBaseFile().get().getFileSize() :
writeConfig.getParquetMaxFileSize();
- // check if max size is reached and create new group, if needed.
- if (totalSizeSoFar + currentSize >
writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
- int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar,
writeConfig.getClusteringTargetFileMaxBytes());
- LOG.info("Adding one clustering group " + totalSizeSoFar + " max
bytes: "
- + writeConfig.getClusteringMaxBytesInGroup() + " num input slices:
" + currentGroup.size() + " output groups: " + numOutputGroups);
- fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
- currentGroup = new ArrayList<>();
- totalSizeSoFar = 0;
- }
-
- // Add to the current file-group
- currentGroup.add(currentSlice);
- // assume each file group size is ~= parquet.max.file.size
- totalSizeSoFar += currentSize;
- }
-
- if (!currentGroup.isEmpty()) {
- if (currentGroup.size() > 1 ||
writeConfig.shouldClusteringSingleGroup()) {
- int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar,
writeConfig.getClusteringTargetFileMaxBytes());
- LOG.info("Adding final clustering group " + totalSizeSoFar + " max
bytes: "
- + writeConfig.getClusteringMaxBytesInGroup() + " num input slices:
" + currentGroup.size() + " output groups: " + numOutputGroups);
- fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
- }
- }
-
- return fileSliceGroups.stream().map(fileSliceGroup ->
- HoodieClusteringGroup.newBuilder()
- .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
- .setNumOutputFileGroups(fileSliceGroup.getRight())
- .setMetrics(buildMetrics(fileSliceGroup.getLeft()))
- .build());
+ return super.buildClusteringGroupsForPartition(partitionPath, fileSlices);
}
@Override
@@ -123,8 +76,4 @@ public class JavaSizeBasedClusteringPlanStrategy<T>
// Only files that have basefile size smaller than small file size are
eligible.
.filter(slice ->
slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) <
getWriteConfig().getClusteringSmallFileLimit());
}
-
- private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize)
{
- return (int) Math.ceil(groupSize / (double) targetFileSize);
- }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
index 9ad996fc3b8..2c82810d175 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
@@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
@@ -35,7 +34,6 @@ import org.apache.spark.api.java.JavaRDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -60,52 +58,7 @@ public class SparkSizeBasedClusteringPlanStrategy<T>
@Override
protected Stream<HoodieClusteringGroup>
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice>
fileSlices) {
- HoodieWriteConfig writeConfig = getWriteConfig();
-
- List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
- List<FileSlice> currentGroup = new ArrayList<>();
-
- // Sort fileSlices before dividing, which makes dividing more compact
- List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
- sortedFileSlices.sort((o1, o2) -> (int)
- ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize()
: writeConfig.getParquetMaxFileSize())
- - (o1.getBaseFile().isPresent() ?
o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
-
- long totalSizeSoFar = 0;
-
- for (FileSlice currentSlice : sortedFileSlices) {
- long currentSize = currentSlice.getBaseFile().isPresent() ?
currentSlice.getBaseFile().get().getFileSize() :
writeConfig.getParquetMaxFileSize();
- // check if max size is reached and create new group, if needed.
- if (totalSizeSoFar + currentSize >
writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
- int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar,
writeConfig.getClusteringTargetFileMaxBytes());
- LOG.info("Adding one clustering group " + totalSizeSoFar + " max
bytes: "
- + writeConfig.getClusteringMaxBytesInGroup() + " num input slices:
" + currentGroup.size() + " output groups: " + numOutputGroups);
- fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
- currentGroup = new ArrayList<>();
- totalSizeSoFar = 0;
- }
-
- // Add to the current file-group
- currentGroup.add(currentSlice);
- // assume each file group size is ~= parquet.max.file.size
- totalSizeSoFar += currentSize;
- }
-
- if (!currentGroup.isEmpty()) {
- if (currentGroup.size() > 1 ||
writeConfig.shouldClusteringSingleGroup()) {
- int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar,
writeConfig.getClusteringTargetFileMaxBytes());
- LOG.info("Adding final clustering group " + totalSizeSoFar + " max
bytes: "
- + writeConfig.getClusteringMaxBytesInGroup() + " num input slices:
" + currentGroup.size() + " output groups: " + numOutputGroups);
- fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
- }
- }
-
- return fileSliceGroups.stream().map(fileSliceGroup ->
- HoodieClusteringGroup.newBuilder()
- .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
- .setNumOutputFileGroups(fileSliceGroup.getRight())
- .setMetrics(buildMetrics(fileSliceGroup.getLeft()))
- .build());
+ return super.buildClusteringGroupsForPartition(partitionPath, fileSlices);
}
@Override
@@ -123,8 +76,4 @@ public class SparkSizeBasedClusteringPlanStrategy<T>
// Only files that have base file size smaller than small file size
are eligible.
.filter(slice ->
slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) <
getWriteConfig().getClusteringSmallFileLimit());
}
-
- private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize)
{
- return (int) Math.ceil(groupSize / (double) targetFileSize);
- }
}