This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c501d11eba [HUDI-5489] Flink offline compactor throws exception in
service mode (#7588)
c501d11eba is described below
commit c501d11eba5aab7cfd597597cf4a53a32ba6f38e
Author: Danny Chan <[email protected]>
AuthorDate: Tue Jan 3 16:13:01 2023 +0800
[HUDI-5489] Flink offline compactor throws exception in service mode (#7588)
In some of the execution modes, the execution env can only handle single
job, so instantiates a fresh new execution env instead of a global
singleton in service mode.
---
.../hudi/sink/clustering/HoodieFlinkClusteringJob.java | 14 ++++----------
.../org/apache/hudi/sink/compact/HoodieFlinkCompactor.java | 13 +++----------
.../hudi/sink/cluster/ITTestHoodieFlinkClustering.java | 3 +--
.../hudi/sink/compact/ITTestHoodieFlinkCompactor.java | 3 +--
4 files changed, 9 insertions(+), 24 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
index 1942b1ce29..b451c36418 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -76,12 +76,10 @@ public class HoodieFlinkClusteringJob {
}
public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-
FlinkClusteringConfig cfg = getFlinkClusteringConfig(args);
Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
- AsyncClusteringService service = new AsyncClusteringService(cfg, conf,
env);
+ AsyncClusteringService service = new AsyncClusteringService(cfg, conf);
new HoodieFlinkClusteringJob(service).start(cfg.serviceMode);
}
@@ -165,20 +163,14 @@ public class HoodieFlinkClusteringJob {
*/
private final HoodieFlinkTable<?> table;
- /**
- * Flink Execution Environment.
- */
- private final StreamExecutionEnvironment env;
-
/**
* Executor Service.
*/
private final ExecutorService executor;
- public AsyncClusteringService(FlinkClusteringConfig cfg, Configuration
conf, StreamExecutionEnvironment env) throws Exception {
+ public AsyncClusteringService(FlinkClusteringConfig cfg, Configuration
conf) throws Exception {
this.cfg = cfg;
this.conf = conf;
- this.env = env;
this.executor = Executors.newFixedThreadPool(1);
// create metaClient
@@ -338,6 +330,8 @@ public class HoodieFlinkClusteringJob {
final DataType rowDataType =
AvroSchemaConverter.convertToDataType(tableAvroSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
// setup configuration
long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 1475a493c1..ea1fbdcc5d 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -69,12 +69,10 @@ public class HoodieFlinkCompactor {
}
public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-
FlinkCompactionConfig cfg = getFlinkCompactionConfig(args);
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
- AsyncCompactionService service = new AsyncCompactionService(cfg, conf,
env);
+ AsyncCompactionService service = new AsyncCompactionService(cfg, conf);
new HoodieFlinkCompactor(service).start(cfg.serviceMode);
}
@@ -157,20 +155,14 @@ public class HoodieFlinkCompactor {
*/
private final HoodieFlinkTable<?> table;
- /**
- * Flink Execution Environment.
- */
- private final StreamExecutionEnvironment env;
-
/**
* Executor Service.
*/
private final ExecutorService executor;
- public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration
conf, StreamExecutionEnvironment env) throws Exception {
+ public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration
conf) throws Exception {
this.cfg = cfg;
this.conf = conf;
- this.env = env;
this.executor = Executors.newFixedThreadPool(1);
// create metaClient
@@ -304,6 +296,7 @@ public class HoodieFlinkCompactor {
}
table.getMetaClient().reloadActiveTimeline();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new CompactionPlanSourceFunction(compactionPlans))
.name("compaction_source")
.uid("uid_compaction_source")
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
index f2273e40a2..29f280e612 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -208,14 +208,13 @@ public class ITTestHoodieFlinkClustering {
TimeUnit.SECONDS.sleep(3);
// Make configuration and setAvroSchema.
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
FlinkClusteringConfig cfg = new FlinkClusteringConfig();
cfg.path = tempFile.getAbsolutePath();
cfg.minClusteringIntervalSeconds = 3;
cfg.schedule = true;
Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
- HoodieFlinkClusteringJob.AsyncClusteringService asyncClusteringService =
new HoodieFlinkClusteringJob.AsyncClusteringService(cfg, conf, env);
+ HoodieFlinkClusteringJob.AsyncClusteringService asyncClusteringService =
new HoodieFlinkClusteringJob.AsyncClusteringService(cfg, conf);
asyncClusteringService.start(null);
// wait for the asynchronous commit to finish
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index 6157b5e901..0ad78890aa 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -191,7 +191,6 @@ public class ITTestHoodieFlinkCompactor {
tableEnv.executeSql(TestSQL.UPDATE_INSERT_T1).await();
// Make configuration and setAvroSchema.
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
FlinkCompactionConfig cfg = new FlinkCompactionConfig();
cfg.path = tempFile.getAbsolutePath();
cfg.minCompactionIntervalSeconds = 3;
@@ -200,7 +199,7 @@ public class ITTestHoodieFlinkCompactor {
conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
conf.setInteger(FlinkOptions.COMPACTION_TASKS.key(),
FlinkMiniCluster.DEFAULT_PARALLELISM);
- HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new
HoodieFlinkCompactor.AsyncCompactionService(cfg, conf, env);
+ HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new
HoodieFlinkCompactor.AsyncCompactionService(cfg, conf);
asyncCompactionService.start(null);
// wait for the asynchronous commit to finish