This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d705dcc4188 [HUDI-5173] Skip if there is only one file in
clusteringGroup (#7159)
d705dcc4188 is described below
commit d705dcc4188223fbd824f36a5d211abeda7b1f23
Author: zhuanshenbsj1 <[email protected]>
AuthorDate: Fri Feb 24 10:23:25 2023 +0800
[HUDI-5173] Skip if there is only one file in clusteringGroup (#7159)
Introduce a new option
'hoodie.clustering.plan.strategy.single.group.clustering.enabled' to allow
disabling single file group clustering, when the clustering sort is also
disabled, clustering single file group is unnecessary and can cause unnecessary
read/write costs.
---
.../apache/hudi/config/HoodieClusteringConfig.java | 11 +++
.../org/apache/hudi/config/HoodieWriteConfig.java | 12 +++
.../FlinkSizeBasedClusteringPlanStrategy.java | 32 ++++++--
.../TestFlinkSizeBasedClusteringPlanStrategy.java | 96 ++++++++++++++++++++++
.../SparkSizeBasedClusteringPlanStrategy.java | 10 ++-
...TestSparkBuildClusteringGroupsForPartition.java | 93 +++++++++++++++++++++
.../realtime/TestHoodieRealtimeRecordReader.java | 1 +
7 files changed, 243 insertions(+), 12 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index bfcd4315d29..b76a66d91c5 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -182,6 +182,12 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("Each group can produce 'N'
(CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups");
+ public static final ConfigProperty<Boolean>
PLAN_STRATEGY_SINGLE_GROUP_CLUSTERING_ENABLED = ConfigProperty
+ .key(CLUSTERING_STRATEGY_PARAM_PREFIX +
".single.group.clustering.enabled")
+ .defaultValue(true)
+ .sinceVersion("0.14.0")
+ .withDocumentation("Whether to generate clustering plan when there is
only one file group involved, by default true");
+
public static final ConfigProperty<String> PLAN_STRATEGY_SORT_COLUMNS =
ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "sort.columns")
.noDefaultValue()
@@ -469,6 +475,11 @@ public class HoodieClusteringConfig extends HoodieConfig {
return this;
}
+ public Builder withSingleGroupClusteringEnabled(Boolean enabled) {
+ clusteringConfig.setValue(PLAN_STRATEGY_SINGLE_GROUP_CLUSTERING_ENABLED,
String.valueOf(enabled));
+ return this;
+ }
+
public Builder
withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode mode) {
clusteringConfig.setValue(PLAN_PARTITION_FILTER_MODE_NAME.key(),
mode.toString());
return this;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index f82ac90c424..2ccd0435d3a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1601,10 +1601,22 @@ public class HoodieWriteConfig extends HoodieConfig {
return
getInt(HoodieClusteringConfig.PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST);
}
+ public boolean isSingleGroupClusteringEnabled() {
+ return
getBoolean(HoodieClusteringConfig.PLAN_STRATEGY_SINGLE_GROUP_CLUSTERING_ENABLED);
+ }
+
+ public boolean shouldClusteringSingleGroup() {
+ return isClusteringSortEnabled() || isSingleGroupClusteringEnabled();
+ }
+
public String getClusteringSortColumns() {
return getString(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS);
}
+ public boolean isClusteringSortEnabled() {
+ return
!StringUtils.isNullOrEmpty(getString(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS));
+ }
+
public HoodieClusteringConfig.LayoutOptimizationStrategy
getLayoutOptimizationStrategy() {
return HoodieClusteringConfig.LayoutOptimizationStrategy.fromValue(
getStringOrDefault(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
index 3abffe38d8b..ac320ceefe6 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
@@ -63,15 +63,23 @@ public class FlinkSizeBasedClusteringPlanStrategy<T>
List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
List<FileSlice> currentGroup = new ArrayList<>();
+
+ // Sort fileSlices before dividing, which makes dividing more compact
+ List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
+ sortedFileSlices.sort((o1, o2) -> (int)
+ ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize()
: writeConfig.getParquetMaxFileSize())
+ - (o1.getBaseFile().isPresent() ?
o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
+
long totalSizeSoFar = 0;
- for (FileSlice currentSlice : fileSlices) {
+ for (FileSlice currentSlice : sortedFileSlices) {
+ long currentSize = currentSlice.getBaseFile().isPresent() ?
currentSlice.getBaseFile().get().getFileSize() :
writeConfig.getParquetMaxFileSize();
// check if max size is reached and create new group, if needed.
- // in now, every clustering group out put is 1 file group.
- if (totalSizeSoFar >= writeConfig.getClusteringTargetFileMaxBytes() &&
!currentGroup.isEmpty()) {
+ if (totalSizeSoFar + currentSize >
writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
+ int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar,
writeConfig.getClusteringTargetFileMaxBytes());
LOG.info("Adding one clustering group " + totalSizeSoFar + " max
bytes: "
- + writeConfig.getClusteringMaxBytesInGroup() + " num input slices:
" + currentGroup.size());
- fileSliceGroups.add(Pair.of(currentGroup, 1));
+ + writeConfig.getClusteringMaxBytesInGroup() + " num input slices:
" + currentGroup.size() + " output groups: " + numOutputGroups);
+ fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
currentGroup = new ArrayList<>();
totalSizeSoFar = 0;
}
@@ -79,11 +87,16 @@ public class FlinkSizeBasedClusteringPlanStrategy<T>
// Add to the current file-group
currentGroup.add(currentSlice);
// assume each file group size is ~= parquet.max.file.size
- totalSizeSoFar += currentSlice.getBaseFile().isPresent() ?
currentSlice.getBaseFile().get().getFileSize() :
writeConfig.getParquetMaxFileSize();
+ totalSizeSoFar += currentSize;
}
if (!currentGroup.isEmpty()) {
- fileSliceGroups.add(Pair.of(currentGroup, 1));
+ if (currentGroup.size() > 1 ||
writeConfig.shouldClusteringSingleGroup()) {
+ int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar,
writeConfig.getClusteringTargetFileMaxBytes());
+ LOG.info("Adding final clustering group " + totalSizeSoFar + " max
bytes: "
+ + writeConfig.getClusteringMaxBytesInGroup() + " num input slices:
" + currentGroup.size() + " output groups: " + numOutputGroups);
+ fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+ }
}
return fileSliceGroups.stream().map(fileSliceGroup ->
@@ -106,8 +119,11 @@ public class FlinkSizeBasedClusteringPlanStrategy<T>
@Override
protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String
partition) {
return super.getFileSlicesEligibleForClustering(partition)
- // Only files that have basefile size smaller than small file size are
eligible.
+ // Only files that have base file size smaller than small file size
are eligible.
.filter(slice ->
slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) <
getWriteConfig().getClusteringSmallFileLimit());
}
+ private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize)
{
+ return (int) Math.ceil(groupSize / (double) targetFileSize);
+ }
}
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestFlinkSizeBasedClusteringPlanStrategy.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestFlinkSizeBasedClusteringPlanStrategy.java
new file mode 100644
index 00000000000..97f12abf322
--- /dev/null
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestFlinkSizeBasedClusteringPlanStrategy.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import
org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
+import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test cases for {@link FlinkSizeBasedClusteringPlanStrategy}.
+ */
+public class TestFlinkSizeBasedClusteringPlanStrategy {
+
+ @Mock
+ HoodieFlinkCopyOnWriteTable table;
+ @Mock
+ HoodieFlinkEngineContext context;
+ HoodieWriteConfig.Builder hoodieWriteConfigBuilder;
+
+ @BeforeEach
+ public void setUp() {
+ this.hoodieWriteConfigBuilder = HoodieWriteConfig
+ .newBuilder()
+ .withPath("path1");
+ }
+
+ @Test
+ public void testBuildClusteringGroupsForPartitionOnlyOneFile() {
+ String partition = "20221117";
+ String fileId = "fg-1";
+ List<FileSlice> fileSliceGroups = new ArrayList<>();
+ fileSliceGroups.add(generateFileSlice(partition, fileId, "0"));
+ // test buildClusteringGroupsForPartition with ClusteringSortColumns config
+ HoodieWriteConfig configWithSortEnabled =
hoodieWriteConfigBuilder.withClusteringConfig(
+ HoodieClusteringConfig.newBuilder()
+
.withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.NONE)
+ .withSingleGroupClusteringEnabled(false)
+ .withClusteringSortColumns("f0")
+ .build())
+ .build();
+ PartitionAwareClusteringPlanStrategy strategyWithSortEnabled = new
FlinkSizeBasedClusteringPlanStrategy(table, context, configWithSortEnabled);
+ Stream<HoodieClusteringGroup> groupStreamSort =
strategyWithSortEnabled.buildClusteringGroupsForPartition(partition,fileSliceGroups);
+ assertEquals(1, groupStreamSort.count());
+
+ // test buildClusteringGroupsForPartition without ClusteringSortColumns
config
+ HoodieWriteConfig configWithSortDisabled =
hoodieWriteConfigBuilder.withClusteringConfig(
+ HoodieClusteringConfig.newBuilder()
+
.withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.NONE)
+ .withSingleGroupClusteringEnabled(false)
+ .withClusteringSortColumns("")
+ .build())
+ .build();
+ PartitionAwareClusteringPlanStrategy strategyWithSortDisabled = new
FlinkSizeBasedClusteringPlanStrategy(table, context, configWithSortDisabled);
+ Stream<HoodieClusteringGroup> groupStreamWithOutSort =
strategyWithSortDisabled.buildClusteringGroupsForPartition(partition,fileSliceGroups);
+ assertEquals(0, groupStreamWithOutSort.count());
+ }
+
+ private FileSlice generateFileSlice(String partitionPath, String fileId,
String baseInstant) {
+ FileSlice fs = new FileSlice(new HoodieFileGroupId(partitionPath, fileId),
baseInstant);
+ fs.setBaseFile(new HoodieBaseFile(FSUtils.makeBaseFileName(baseInstant,
"1-0-1", fileId)));
+ return fs;
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
index fe6d7c2efc2..d2fa89ec772 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
@@ -92,10 +92,12 @@ public class SparkSizeBasedClusteringPlanStrategy<T>
}
if (!currentGroup.isEmpty()) {
- int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar,
writeConfig.getClusteringTargetFileMaxBytes());
- LOG.info("Adding final clustering group " + totalSizeSoFar + " max
bytes: "
- + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: "
+ currentGroup.size() + " output groups: " + numOutputGroups);
- fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+ if (currentGroup.size() > 1 ||
writeConfig.shouldClusteringSingleGroup()) {
+ int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar,
writeConfig.getClusteringTargetFileMaxBytes());
+ LOG.info("Adding final clustering group " + totalSizeSoFar + " max
bytes: "
+ + writeConfig.getClusteringMaxBytesInGroup() + " num input slices:
" + currentGroup.size() + " output groups: " + numOutputGroups);
+ fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+ }
}
return fileSliceGroups.stream().map(fileSliceGroup ->
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkBuildClusteringGroupsForPartition.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkBuildClusteringGroupsForPartition.java
new file mode 100644
index 00000000000..d12761012c4
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkBuildClusteringGroupsForPartition.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import
org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
+import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestSparkBuildClusteringGroupsForPartition {
+ @Mock
+ HoodieSparkCopyOnWriteTable table;
+ @Mock
+ HoodieSparkEngineContext context;
+ HoodieWriteConfig.Builder hoodieWriteConfigBuilder;
+
+ @BeforeEach
+ public void setUp() {
+ this.hoodieWriteConfigBuilder = HoodieWriteConfig
+ .newBuilder()
+ .withPath("path1");
+ }
+
+ @Test
+ public void testBuildClusteringGroupsForPartitionOnlyOneFile() {
+ String partition = "20221117";
+ String fileId = "fg-1";
+ List<FileSlice> fileSliceGroups = new ArrayList<>();
+ fileSliceGroups.add(generateFileSlice(partition, fileId, "0"));
+ // test buildClusteringGroupsForPartition with ClusteringSortColumns config
+ HoodieWriteConfig configWithSortEnabled =
hoodieWriteConfigBuilder.withClusteringConfig(
+ HoodieClusteringConfig.newBuilder()
+
.withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.NONE)
+ .withSingleGroupClusteringEnabled(false)
+ .withClusteringSortColumns("f0")
+ .build())
+ .build();
+ PartitionAwareClusteringPlanStrategy strategyWithSortEnabled = new
SparkSizeBasedClusteringPlanStrategy(table, context, configWithSortEnabled);
+ Stream<HoodieClusteringGroup> groupStreamSort =
strategyWithSortEnabled.buildClusteringGroupsForPartition(partition,fileSliceGroups);
+ assertEquals(1, groupStreamSort.count());
+
+ // test buildClusteringGroupsForPartition without ClusteringSortColumns
config
+ HoodieWriteConfig configWithSortDisabled =
hoodieWriteConfigBuilder.withClusteringConfig(
+ HoodieClusteringConfig.newBuilder()
+
.withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.NONE)
+ .withSingleGroupClusteringEnabled(false)
+ .withClusteringSortColumns("")
+ .build())
+ .build();
+ PartitionAwareClusteringPlanStrategy strategyWithSortDisabled = new
SparkSizeBasedClusteringPlanStrategy(table, context, configWithSortDisabled);
+ Stream<HoodieClusteringGroup> groupStreamWithOutSort =
strategyWithSortDisabled.buildClusteringGroupsForPartition(partition,fileSliceGroups);
+ assertEquals(0, groupStreamWithOutSort.count());
+ }
+
+ private FileSlice generateFileSlice(String partitionPath, String fileId,
String baseInstant) {
+ FileSlice fs = new FileSlice(new HoodieFileGroupId(partitionPath, fileId),
baseInstant);
+ fs.setBaseFile(new HoodieBaseFile(FSUtils.makeBaseFileName(baseInstant,
"1-0-1", fileId)));
+ return fs;
+ }
+}
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
index f334bbf3bc9..6c530833d55 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
@@ -686,6 +686,7 @@ public class TestHoodieRealtimeRecordReader {
JobConf newJobConf = new JobConf(baseJobConf);
List<Schema.Field> fields = schema.getFields();
setHiveColumnNameProps(fields, newJobConf, false);
+ newJobConf.set("columns.types",
"string,string,string,string,string,string,string,string,bigint,string,string");
RecordReader<NullWritable, ArrayWritable> reader =
inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL);
// use reader to read log file.
NullWritable key = reader.createKey();