This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f39125441e0 [HUDI-6396] Flink supports schedule the clustering in
batch execution mode and code refactor (#9004)
f39125441e0 is described below
commit f39125441e011e2043924b8e47aa55e36c918dbc
Author: ksmou <[email protected]>
AuthorDate: Mon Jun 19 09:51:44 2023 +0800
[HUDI-6396] Flink supports schedule the clustering in batch execution mode
and code refactor (#9004)
* Flink supports schedule the clustering in batch execution mode and code
refactor
* change method name
---
.../hudi/sink/StreamWriteOperatorCoordinator.java | 30 +++++++++++-----------
1 file changed, 15 insertions(+), 15 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index e6076171863..df473584841 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -256,16 +256,8 @@ public class StreamWriteOperatorCoordinator
// the stream write task snapshot and flush the data buffer
synchronously in sequence,
// so a successful checkpoint subsumes the old one(follows the
checkpoint subsuming contract)
final boolean committed = commitInstant(this.instant, checkpointId);
-
- if (tableState.scheduleCompaction) {
- // if async compaction is on, schedule the compaction
- CompactionUtil.scheduleCompaction(metaClient, writeClient,
tableState.isDeltaTimeCompaction, committed);
- }
-
- if (tableState.scheduleClustering) {
- // if async clustering is on, schedule the clustering
- ClusteringUtil.scheduleClustering(conf, writeClient, committed);
- }
+ // schedules the compaction or clustering if it is enabled in stream
execution mode
+ scheduleTableServices(committed);
if (committed) {
// start new instant.
@@ -452,15 +444,23 @@ public class StreamWriteOperatorCoordinator
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
// sync Hive synchronously if it is enabled in batch mode.
syncHive();
- // schedules the compaction plan in batch execution mode
- if (tableState.scheduleCompaction) {
- // if async compaction is on, schedule the compaction
- CompactionUtil.scheduleCompaction(metaClient, writeClient,
tableState.isDeltaTimeCompaction, true);
- }
+ // schedules the compaction or clustering if it is enabled in batch
execution mode
+ scheduleTableServices(true);
}
}
}
+ private void scheduleTableServices(Boolean committed) {
+ // if compaction is on, schedule the compaction
+ if (tableState.scheduleCompaction) {
+ CompactionUtil.scheduleCompaction(metaClient, writeClient,
tableState.isDeltaTimeCompaction, committed);
+ }
+ // if clustering is on, schedule the clustering
+ if (tableState.scheduleClustering) {
+ ClusteringUtil.scheduleClustering(conf, writeClient, committed);
+ }
+ }
+
private void handleWriteMetaEvent(WriteMetadataEvent event) {
// the write task does not block after checkpointing(and before it
receives a checkpoint success event),
// if it checkpoints succeed then flushes the data buffer again before
this coordinator receives a checkpoint