This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ac9f46c1ebb [HUDI-9532] Add sort columns in plan generated by Flink 
(#13455)
ac9f46c1ebb is described below

commit ac9f46c1ebbecf4bdc7578229a0afbce9ea57641
Author: YueZhang <[email protected]>
AuthorDate: Wed Jun 18 20:09:56 2025 +0800

    [HUDI-9532] Add sort columns in plan generated by Flink (#13455)
---
 .../org/apache/hudi/util/FlinkWriteClients.java    |  1 +
 .../apache/hudi/sink/ITTestDataStreamWrite.java    | 29 +++++++++++++++++++++-
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 6e0b26a4f23..70929c142d2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -180,6 +180,7 @@ public class FlinkWriteClients {
                     
.withClusteringPartitionRegexPattern(conf.get(FlinkOptions.CLUSTERING_PLAN_STRATEGY_PARTITION_REGEX_PATTERN))
                     
.withClusteringPartitionSelected(conf.get(FlinkOptions.CLUSTERING_PLAN_STRATEGY_PARTITION_SELECTED))
                     
.withAsyncClusteringMaxCommits(conf.getInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS))
+                    
.withClusteringSortColumns(conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS))
                     
.withScheduleInlineClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED))
                     
.withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED))
                     .build())
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 8aca31b2173..c1dd8c8d53e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -18,9 +18,15 @@
 
 package org.apache.hudi.sink;
 
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.model.HoodieFlinkInternalRow;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsInference;
@@ -29,6 +35,7 @@ import org.apache.hudi.sink.transform.ChainedTransformer;
 import org.apache.hudi.sink.transform.Transformer;
 import org.apache.hudi.sink.utils.Pipelines;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.table.catalog.HoodieCatalog;
 import org.apache.hudi.table.catalog.TableOptionProperties;
 import org.apache.hudi.util.AvroSchemaConverter;
@@ -74,10 +81,12 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
 import static 
org.apache.hudi.config.HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE;
 import static 
org.apache.hudi.config.HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP;
 import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
 import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Integration test for Flink Hoodie stream sink.
@@ -171,7 +180,8 @@ public class ITTestDataStreamWrite extends TestLogger {
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testWriteCopyOnWriteWithClustering(boolean 
sortClusteringEnabled) throws Exception {
-    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.toURI().toString());
+    String basePath = tempFile.toURI().toString();
+    Configuration conf = TestConfigurations.getDefaultConf(basePath);
     conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
     conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
     conf.setString(FlinkOptions.OPERATION, "insert");
@@ -180,6 +190,23 @@ public class ITTestDataStreamWrite extends TestLogger {
     }
 
     writeWithClusterAndCheckExpected(conf, "cow_write_with_cluster", 1, 
EXPECTED);
+    if (sortClusteringEnabled) {
+      HadoopStorageConfiguration storageConf = new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf));
+      HoodieTableMetaClient metaClient =
+          HoodieTestUtils.createMetaClient(storageConf, basePath);
+      HoodieInstant clusteringInstant = 
metaClient.getActiveTimeline().getLastClusteringInstant().get();
+      Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = 
ClusteringUtils.getClusteringPlan(
+          metaClient, clusteringInstant);
+      assertTrue(clusteringPlanOption.isPresent());
+      HoodieClusteringPlan clusteringPlan = 
clusteringPlanOption.get().getRight();
+      Map<String, String> strategyParams = 
clusteringPlan.getStrategy().getStrategyParams();
+      // could be used in spark MultipleSparkJobExecutionStrategy
+      Option<String[]> orderByColumnsOpt =
+          
Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
+              .map(listStr -> listStr.split(","));
+      assertTrue(orderByColumnsOpt.isPresent());
+      assertTrue(orderByColumnsOpt.get()[0].equalsIgnoreCase("uuid"));
+    }
   }
 
   @ParameterizedTest

Reply via email to