This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ac9f46c1ebb [HUDI-9532] Add sort columns in plan generated by Flink
(#13455)
ac9f46c1ebb is described below
commit ac9f46c1ebbecf4bdc7578229a0afbce9ea57641
Author: YueZhang <[email protected]>
AuthorDate: Wed Jun 18 20:09:56 2025 +0800
[HUDI-9532] Add sort columns in plan generated by Flink (#13455)
---
.../org/apache/hudi/util/FlinkWriteClients.java | 1 +
.../apache/hudi/sink/ITTestDataStreamWrite.java | 29 +++++++++++++++++++++-
2 files changed, 29 insertions(+), 1 deletion(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 6e0b26a4f23..70929c142d2 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -180,6 +180,7 @@ public class FlinkWriteClients {
.withClusteringPartitionRegexPattern(conf.get(FlinkOptions.CLUSTERING_PLAN_STRATEGY_PARTITION_REGEX_PATTERN))
.withClusteringPartitionSelected(conf.get(FlinkOptions.CLUSTERING_PLAN_STRATEGY_PARTITION_SELECTED))
.withAsyncClusteringMaxCommits(conf.getInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS))
+
.withClusteringSortColumns(conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS))
.withScheduleInlineClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED))
.withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED))
.build())
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 8aca31b2173..c1dd8c8d53e 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -18,9 +18,15 @@
package org.apache.hudi.sink;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.model.HoodieFlinkInternalRow;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsInference;
@@ -29,6 +35,7 @@ import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.catalog.HoodieCatalog;
import org.apache.hudi.table.catalog.TableOptionProperties;
import org.apache.hudi.util.AvroSchemaConverter;
@@ -74,10 +81,12 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import static
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
import static
org.apache.hudi.config.HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE;
import static
org.apache.hudi.config.HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP;
import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Integration test for Flink Hoodie stream sink.
@@ -171,7 +180,8 @@ public class ITTestDataStreamWrite extends TestLogger {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testWriteCopyOnWriteWithClustering(boolean
sortClusteringEnabled) throws Exception {
- Configuration conf =
TestConfigurations.getDefaultConf(tempFile.toURI().toString());
+ String basePath = tempFile.toURI().toString();
+ Configuration conf = TestConfigurations.getDefaultConf(basePath);
conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
conf.setString(FlinkOptions.OPERATION, "insert");
@@ -180,6 +190,23 @@ public class ITTestDataStreamWrite extends TestLogger {
}
writeWithClusterAndCheckExpected(conf, "cow_write_with_cluster", 1,
EXPECTED);
+ if (sortClusteringEnabled) {
+ HadoopStorageConfiguration storageConf = new
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf));
+ HoodieTableMetaClient metaClient =
+ HoodieTestUtils.createMetaClient(storageConf, basePath);
+ HoodieInstant clusteringInstant =
metaClient.getActiveTimeline().getLastClusteringInstant().get();
+ Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption =
ClusteringUtils.getClusteringPlan(
+ metaClient, clusteringInstant);
+ assertTrue(clusteringPlanOption.isPresent());
+ HoodieClusteringPlan clusteringPlan =
clusteringPlanOption.get().getRight();
+ Map<String, String> strategyParams =
clusteringPlan.getStrategy().getStrategyParams();
+ // could be used in spark MultipleSparkJobExecutionStrategy
+ Option<String[]> orderByColumnsOpt =
+
Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
+ .map(listStr -> listStr.split(","));
+ assertTrue(orderByColumnsOpt.isPresent());
+ assertTrue(orderByColumnsOpt.get()[0].equalsIgnoreCase("uuid"));
+ }
}
@ParameterizedTest