SteNicholas commented on code in PR #8680:
URL: https://github.com/apache/hudi/pull/8680#discussion_r1191888621


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java:
##########
@@ -259,93 +257,105 @@ private void cluster() throws Exception {
         table.getMetaClient().reloadActiveTimeline();
       }
 
-      // fetch the instant based on the configured execution sequence
-      List<HoodieInstant> instants = 
ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient());
-      if (instants.isEmpty()) {
-        // do nothing.
-        LOG.info("No clustering plan scheduled, turns on the clustering plan 
schedule with --schedule option");
-        return;
-      }
-
-      final HoodieInstant clusteringInstant;
-      if (cfg.clusteringInstantTime != null) {
-        clusteringInstant = instants.stream()
-            .filter(i -> i.getTimestamp().equals(cfg.clusteringInstantTime))
-            .findFirst()
-            .orElseThrow(() -> new HoodieException("Clustering instant [" + 
cfg.clusteringInstantTime + "] not found"));
+      StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
+
+      int clusteringParallelism;
+      DataStream<ClusteringPlanEvent> planStream;
+      HoodieInstant clusteringInstant = null;
+      if (serviceMode) {
+        clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS);
+        planStream = env.addSource(new 
ServiceSourceFunction(conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)))
+            .name("clustering_service_source")

Review Comment:
   @danny0405, I don't think we need exactly-once semantic because this only 
requires to add a long running source to create the clustering execution 
pipeline. The semantic is guaranteed via `ClusteringPlanOperator`, 
`ClusteringOperator` and `ClusteringCommitSink`. Meanwhile, I have tested the 
streaming job in internal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to