This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f3b7427a597 [MINOR] Code refactor about ClusteringPlanStrategy (#9128)
f3b7427a597 is described below

commit f3b7427a597aa1ae8618ae96bc87ac25f92d16c3
Author: ksmou <[email protected]>
AuthorDate: Thu Jul 6 10:05:11 2023 +0800

    [MINOR] Code refactor about ClusteringPlanStrategy (#9128)
---
 .../PartitionAwareClusteringPlanStrategy.java      | 56 +++++++++++++++++++++-
 .../FlinkSizeBasedClusteringPlanStrategy.java      | 53 +-------------------
 .../JavaSizeBasedClusteringPlanStrategy.java       | 53 +-------------------
 .../SparkSizeBasedClusteringPlanStrategy.java      | 53 +-------------------
 4 files changed, 57 insertions(+), 158 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 5b6ee9075bd..0401672dfb9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilter;
@@ -34,6 +35,7 @@ import 
org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.regex.Pattern;
@@ -53,8 +55,54 @@ public abstract class 
PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus
   /**
    * Create Clustering group based on files eligible for clustering in the 
partition.
    */
-  protected abstract Stream<HoodieClusteringGroup> 
buildClusteringGroupsForPartition(String partitionPath,
-                                                                               
      List<FileSlice> fileSlices);
+  protected Stream<HoodieClusteringGroup> 
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> 
fileSlices) {
+    HoodieWriteConfig writeConfig = getWriteConfig();
+
+    List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
+    List<FileSlice> currentGroup = new ArrayList<>();
+
+    // Sort fileSlices before dividing, which makes dividing more compact
+    List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
+    sortedFileSlices.sort((o1, o2) -> (int)
+        ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize() 
: writeConfig.getParquetMaxFileSize())
+            - (o1.getBaseFile().isPresent() ? 
o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
+
+    long totalSizeSoFar = 0;
+
+    for (FileSlice currentSlice : sortedFileSlices) {
+      long currentSize = currentSlice.getBaseFile().isPresent() ? 
currentSlice.getBaseFile().get().getFileSize() : 
writeConfig.getParquetMaxFileSize();
+      // check if max size is reached and create new group, if needed.
+      if (totalSizeSoFar + currentSize > 
writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
+        int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, 
writeConfig.getClusteringTargetFileMaxBytes());
+        LOG.info("Adding one clustering group " + totalSizeSoFar + " max 
bytes: "
+            + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: 
" + currentGroup.size() + " output groups: " + numOutputGroups);
+        fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+        currentGroup = new ArrayList<>();
+        totalSizeSoFar = 0;
+      }
+
+      // Add to the current file-group
+      currentGroup.add(currentSlice);
+      // assume each file group size is ~= parquet.max.file.size
+      totalSizeSoFar += currentSize;
+    }
+
+    if (!currentGroup.isEmpty()) {
+      if (currentGroup.size() > 1 || 
writeConfig.shouldClusteringSingleGroup()) {
+        int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, 
writeConfig.getClusteringTargetFileMaxBytes());
+        LOG.info("Adding final clustering group " + totalSizeSoFar + " max 
bytes: "
+            + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: 
" + currentGroup.size() + " output groups: " + numOutputGroups);
+        fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+      }
+    }
+
+    return fileSliceGroups.stream().map(fileSliceGroup ->
+        HoodieClusteringGroup.newBuilder()
+            .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
+            .setNumOutputFileGroups(fileSliceGroup.getRight())
+            .setMetrics(buildMetrics(fileSliceGroup.getLeft()))
+            .build());
+  }
 
   /**
    * Return list of partition paths to be considered for clustering.
@@ -135,4 +183,8 @@ public abstract class 
PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus
     }
     return partitionPaths;
   }
+
+  protected int getNumberOfOutputFileGroups(long groupSize, long 
targetFileSize) {
+    return (int) Math.ceil(groupSize / (double) targetFileSize);
+  }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
index 57734acd695..a232bf54c16 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
@@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import 
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
@@ -34,7 +33,6 @@ import 
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPla
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -59,52 +57,7 @@ public class FlinkSizeBasedClusteringPlanStrategy<T>
 
   @Override
   protected Stream<HoodieClusteringGroup> 
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> 
fileSlices) {
-    HoodieWriteConfig writeConfig = getWriteConfig();
-
-    List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
-    List<FileSlice> currentGroup = new ArrayList<>();
-
-    // Sort fileSlices before dividing, which makes dividing more compact
-    List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
-    sortedFileSlices.sort((o1, o2) -> (int)
-        ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize() 
: writeConfig.getParquetMaxFileSize())
-            - (o1.getBaseFile().isPresent() ? 
o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
-
-    long totalSizeSoFar = 0;
-
-    for (FileSlice currentSlice : sortedFileSlices) {
-      long currentSize = currentSlice.getBaseFile().isPresent() ? 
currentSlice.getBaseFile().get().getFileSize() : 
writeConfig.getParquetMaxFileSize();
-      // check if max size is reached and create new group, if needed.
-      if (totalSizeSoFar + currentSize > 
writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
-        int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, 
writeConfig.getClusteringTargetFileMaxBytes());
-        LOG.info("Adding one clustering group " + totalSizeSoFar + " max 
bytes: "
-            + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: 
" + currentGroup.size() + " output groups: " + numOutputGroups);
-        fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
-        currentGroup = new ArrayList<>();
-        totalSizeSoFar = 0;
-      }
-
-      // Add to the current file-group
-      currentGroup.add(currentSlice);
-      // assume each file group size is ~= parquet.max.file.size
-      totalSizeSoFar += currentSize;
-    }
-
-    if (!currentGroup.isEmpty()) {
-      if (currentGroup.size() > 1 || 
writeConfig.shouldClusteringSingleGroup()) {
-        int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, 
writeConfig.getClusteringTargetFileMaxBytes());
-        LOG.info("Adding final clustering group " + totalSizeSoFar + " max 
bytes: "
-            + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: 
" + currentGroup.size() + " output groups: " + numOutputGroups);
-        fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
-      }
-    }
-
-    return fileSliceGroups.stream().map(fileSliceGroup ->
-        HoodieClusteringGroup.newBuilder()
-            .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
-            .setNumOutputFileGroups(fileSliceGroup.getRight())
-            .setMetrics(buildMetrics(fileSliceGroup.getLeft()))
-            .build());
+    return super.buildClusteringGroupsForPartition(partitionPath, fileSlices);
   }
 
   @Override
@@ -122,8 +75,4 @@ public class FlinkSizeBasedClusteringPlanStrategy<T>
         // Only files that have base file size smaller than small file size 
are eligible.
         .filter(slice -> 
slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < 
getWriteConfig().getClusteringSmallFileLimit());
   }
-
-  private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) 
{
-    return (int) Math.ceil(groupSize / (double) targetFileSize);
-  }
 }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
index d8f0c5fc804..03d416452d2 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
@@ -27,7 +27,6 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import 
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
@@ -35,7 +34,6 @@ import 
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPla
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -60,52 +58,7 @@ public class JavaSizeBasedClusteringPlanStrategy<T>
 
   @Override
   protected Stream<HoodieClusteringGroup> 
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> 
fileSlices) {
-    HoodieWriteConfig writeConfig = getWriteConfig();
-
-    List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
-    List<FileSlice> currentGroup = new ArrayList<>();
-
-    // Sort fileSlices before dividing, which makes dividing more compact
-    List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
-    sortedFileSlices.sort((o1, o2) -> (int)
-        ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize() 
: writeConfig.getParquetMaxFileSize())
-            - (o1.getBaseFile().isPresent() ? 
o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
-
-    long totalSizeSoFar = 0;
-
-    for (FileSlice currentSlice : sortedFileSlices) {
-      long currentSize = currentSlice.getBaseFile().isPresent() ? 
currentSlice.getBaseFile().get().getFileSize() : 
writeConfig.getParquetMaxFileSize();
-      // check if max size is reached and create new group, if needed.
-      if (totalSizeSoFar + currentSize > 
writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
-        int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, 
writeConfig.getClusteringTargetFileMaxBytes());
-        LOG.info("Adding one clustering group " + totalSizeSoFar + " max 
bytes: "
-            + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: 
" + currentGroup.size() + " output groups: " + numOutputGroups);
-        fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
-        currentGroup = new ArrayList<>();
-        totalSizeSoFar = 0;
-      }
-
-      // Add to the current file-group
-      currentGroup.add(currentSlice);
-      // assume each file group size is ~= parquet.max.file.size
-      totalSizeSoFar += currentSize;
-    }
-
-    if (!currentGroup.isEmpty()) {
-      if (currentGroup.size() > 1 || 
writeConfig.shouldClusteringSingleGroup()) {
-        int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, 
writeConfig.getClusteringTargetFileMaxBytes());
-        LOG.info("Adding final clustering group " + totalSizeSoFar + " max 
bytes: "
-            + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: 
" + currentGroup.size() + " output groups: " + numOutputGroups);
-        fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
-      }
-    }
-
-    return fileSliceGroups.stream().map(fileSliceGroup ->
-        HoodieClusteringGroup.newBuilder()
-            .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
-            .setNumOutputFileGroups(fileSliceGroup.getRight())
-            .setMetrics(buildMetrics(fileSliceGroup.getLeft()))
-            .build());
+    return super.buildClusteringGroupsForPartition(partitionPath, fileSlices);
   }
 
   @Override
@@ -123,8 +76,4 @@ public class JavaSizeBasedClusteringPlanStrategy<T>
         // Only files that have basefile size smaller than small file size are 
eligible.
         .filter(slice -> 
slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < 
getWriteConfig().getClusteringSmallFileLimit());
   }
-
-  private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) 
{
-    return (int) Math.ceil(groupSize / (double) targetFileSize);
-  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
index 9ad996fc3b8..2c82810d175 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
@@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import 
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
@@ -35,7 +34,6 @@ import org.apache.spark.api.java.JavaRDD;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -60,52 +58,7 @@ public class SparkSizeBasedClusteringPlanStrategy<T>
 
   @Override
   protected Stream<HoodieClusteringGroup> 
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> 
fileSlices) {
-    HoodieWriteConfig writeConfig = getWriteConfig();
-
-    List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
-    List<FileSlice> currentGroup = new ArrayList<>();
-
-    // Sort fileSlices before dividing, which makes dividing more compact
-    List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
-    sortedFileSlices.sort((o1, o2) -> (int)
-        ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize() 
: writeConfig.getParquetMaxFileSize())
-            - (o1.getBaseFile().isPresent() ? 
o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
-
-    long totalSizeSoFar = 0;
-
-    for (FileSlice currentSlice : sortedFileSlices) {
-      long currentSize = currentSlice.getBaseFile().isPresent() ? 
currentSlice.getBaseFile().get().getFileSize() : 
writeConfig.getParquetMaxFileSize();
-      // check if max size is reached and create new group, if needed.
-      if (totalSizeSoFar + currentSize > 
writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
-        int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, 
writeConfig.getClusteringTargetFileMaxBytes());
-        LOG.info("Adding one clustering group " + totalSizeSoFar + " max 
bytes: "
-            + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: 
" + currentGroup.size() + " output groups: " + numOutputGroups);
-        fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
-        currentGroup = new ArrayList<>();
-        totalSizeSoFar = 0;
-      }
-
-      // Add to the current file-group
-      currentGroup.add(currentSlice);
-      // assume each file group size is ~= parquet.max.file.size
-      totalSizeSoFar += currentSize;
-    }
-
-    if (!currentGroup.isEmpty()) {
-      if (currentGroup.size() > 1 || 
writeConfig.shouldClusteringSingleGroup()) {
-        int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, 
writeConfig.getClusteringTargetFileMaxBytes());
-        LOG.info("Adding final clustering group " + totalSizeSoFar + " max 
bytes: "
-            + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: 
" + currentGroup.size() + " output groups: " + numOutputGroups);
-        fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
-      }
-    }
-
-    return fileSliceGroups.stream().map(fileSliceGroup ->
-        HoodieClusteringGroup.newBuilder()
-          .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
-          .setNumOutputFileGroups(fileSliceGroup.getRight())
-          .setMetrics(buildMetrics(fileSliceGroup.getLeft()))
-          .build());
+    return super.buildClusteringGroupsForPartition(partitionPath, fileSlices);
   }
 
   @Override
@@ -123,8 +76,4 @@ public class SparkSizeBasedClusteringPlanStrategy<T>
         // Only files that have base file size smaller than small file size 
are eligible.
         .filter(slice -> 
slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < 
getWriteConfig().getClusteringSmallFileLimit());
   }
-
-  private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) 
{
-    return (int) Math.ceil(groupSize / (double) targetFileSize);
-  }
 }

Reply via email to