danny0405 commented on code in PR #8680:
URL: https://github.com/apache/hudi/pull/8680#discussion_r1190968991
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java:
##########
@@ -259,93 +257,105 @@ private void cluster() throws Exception {
table.getMetaClient().reloadActiveTimeline();
}
- // fetch the instant based on the configured execution sequence
- List<HoodieInstant> instants =
ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient());
- if (instants.isEmpty()) {
- // do nothing.
- LOG.info("No clustering plan scheduled, turns on the clustering plan
schedule with --schedule option");
- return;
- }
-
- final HoodieInstant clusteringInstant;
- if (cfg.clusteringInstantTime != null) {
- clusteringInstant = instants.stream()
- .filter(i -> i.getTimestamp().equals(cfg.clusteringInstantTime))
- .findFirst()
- .orElseThrow(() -> new HoodieException("Clustering instant [" +
cfg.clusteringInstantTime + "] not found"));
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(conf);
+
+ int clusteringParallelism;
+ DataStream<ClusteringPlanEvent> planStream;
+ HoodieInstant clusteringInstant = null;
+ if (serviceMode) {
+ clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS);
+ planStream = env.addSource(new
ServiceSourceFunction(conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)))
+ .name("clustering_service_source")
Review Comment:
Do we need exactly-once semantics for this streaming job?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java:
##########
@@ -259,93 +257,105 @@ private void cluster() throws Exception {
table.getMetaClient().reloadActiveTimeline();
}
- // fetch the instant based on the configured execution sequence
- List<HoodieInstant> instants =
ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient());
- if (instants.isEmpty()) {
- // do nothing.
- LOG.info("No clustering plan scheduled, turns on the clustering plan
schedule with --schedule option");
- return;
- }
-
- final HoodieInstant clusteringInstant;
- if (cfg.clusteringInstantTime != null) {
- clusteringInstant = instants.stream()
- .filter(i -> i.getTimestamp().equals(cfg.clusteringInstantTime))
- .findFirst()
- .orElseThrow(() -> new HoodieException("Clustering instant [" +
cfg.clusteringInstantTime + "] not found"));
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(conf);
+
+ int clusteringParallelism;
+ DataStream<ClusteringPlanEvent> planStream;
+ HoodieInstant clusteringInstant = null;
+ if (serviceMode) {
+ clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS);
+ planStream = env.addSource(new
ServiceSourceFunction(conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)))
+ .name("clustering_service_source")
+ .uid("uid_clustering_service_source")
+ .setParallelism(1)
+ .transform("cluster_plan_generate",
TypeInformation.of(ClusteringPlanEvent.class), new ClusteringPlanOperator(conf))
+ .setParallelism(1);
} else {
- // check for inflight clustering plans and roll them back if required
- clusteringInstant =
- CompactionUtil.isLIFO(cfg.clusteringSeq) ?
instants.get(instants.size() - 1) : instants.get(0);
- }
+ // fetch the instant based on the configured execution sequence
+ List<HoodieInstant> instants =
ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient());
+ if (instants.isEmpty()) {
Review Comment:
I was prospecting the whole logic `#cluster` should be shared in the
streaming source function, the function can start a timeline service and it can
generate compaction/clustering plan periodically like what we do now in client
Thread with a while loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]