This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new efd9dab14c [HUDI-4551] Tweak the default parallelism of flink pipeline
to execution env parallelism (#6312)
efd9dab14c is described below
commit efd9dab14cdea9ec2b09d79499fa95d5584df4fa
Author: Nicholas Jiang <[email protected]>
AuthorDate: Thu Aug 18 16:13:28 2022 +0800
[HUDI-4551] Tweak the default parallelism of flink pipeline to execution
env parallelism (#6312)
---
.../apache/hudi/configuration/FlinkOptions.java | 20 +++----
.../hudi/configuration/OptionsInference.java | 65 ++++++++++++++++++++++
.../java/org/apache/hudi/sink/utils/Pipelines.java | 20 +++----
.../apache/hudi/streamer/HoodieFlinkStreamer.java | 7 ++-
.../org/apache/hudi/table/HoodieTableSink.java | 9 +--
.../org/apache/hudi/table/HoodieTableSource.java | 2 +
.../apache/hudi/sink/ITTestDataStreamWrite.java | 8 ++-
.../sink/cluster/ITTestHoodieFlinkClustering.java | 4 +-
.../sink/compact/ITTestHoodieFlinkCompactor.java | 8 +--
.../apache/hudi/table/ITTestHoodieDataSource.java | 4 +-
10 files changed, 106 insertions(+), 41 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 3638113288..7b78fb8d6a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -153,8 +153,8 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<Integer> READ_TASKS = ConfigOptions
.key("read.tasks")
.intType()
- .defaultValue(4)
- .withDescription("Parallelism of tasks that do actual read, default is
4");
+ .noDefaultValue()
+ .withDescription("Parallelism of tasks that do actual read, default is
the parallelism of the execution environment");
public static final ConfigOption<String> SOURCE_AVRO_SCHEMA_PATH =
ConfigOptions
.key("source.avro-schema.path")
@@ -395,19 +395,19 @@ public class FlinkOptions extends HoodieConfig {
.key("write.index_bootstrap.tasks")
.intType()
.noDefaultValue()
- .withDescription("Parallelism of tasks that do index bootstrap, default
is the parallelism of the execution environment");
+ .withDescription("Parallelism of tasks that do index bootstrap, default
same as the sink parallelism");
public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS = ConfigOptions
.key("write.bucket_assign.tasks")
.intType()
.noDefaultValue()
- .withDescription("Parallelism of tasks that do bucket assign, default is
the parallelism of the execution environment");
+ .withDescription("Parallelism of tasks that do bucket assign, default
same as the write task parallelism");
public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions
.key("write.tasks")
.intType()
- .defaultValue(4)
- .withDescription("Parallelism of tasks that do actual write, default is
4");
+ .noDefaultValue()
+ .withDescription("Parallelism of tasks that do actual write, default is
the parallelism of the execution environment");
public static final ConfigOption<Double> WRITE_TASK_MAX_SIZE = ConfigOptions
.key("write.task.max.size")
@@ -512,8 +512,8 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<Integer> COMPACTION_TASKS = ConfigOptions
.key("compaction.tasks")
.intType()
- .defaultValue(4) // default WRITE_TASKS * COMPACTION_DELTA_COMMITS * 0.2
(assumes 5 commits generate one bucket)
- .withDescription("Parallelism of tasks that do actual compaction,
default is 4");
+ .noDefaultValue()
+ .withDescription("Parallelism of tasks that do actual compaction,
default same as the write task parallelism");
public static final String NUM_COMMITS = "num_commits";
public static final String TIME_ELAPSED = "time_elapsed";
@@ -630,8 +630,8 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<Integer> CLUSTERING_TASKS = ConfigOptions
.key("clustering.tasks")
.intType()
- .defaultValue(4)
- .withDescription("Parallelism of tasks that do actual clustering,
default is 4");
+ .noDefaultValue()
+ .withDescription("Parallelism of tasks that do actual clustering,
default same as the write task parallelism");
public static final ConfigOption<Integer> CLUSTERING_TARGET_PARTITIONS =
ConfigOptions
.key("clustering.plan.strategy.daybased.lookback.partitions")
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
new file mode 100644
index 0000000000..3e02d23732
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.configuration;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Tool helping to infer the flink options {@link FlinkOptions}.
+ */
+public class OptionsInference {
+
+ /**
+ * Sets up the default source task parallelism if it is not specified.
+ *
+ * @param conf The configuration
+ * @param envTasks The parallelism of the execution env
+ */
+ public static void setupSourceTasks(Configuration conf, int envTasks) {
+ if (!conf.contains(FlinkOptions.READ_TASKS)) {
+ conf.setInteger(FlinkOptions.READ_TASKS, envTasks);
+ }
+ }
+
+ /**
+ * Sets up the default sink tasks parallelism if it is not specified.
+ *
+ * @param conf The configuration
+ * @param envTasks The parallelism of the execution env
+ */
+ public static void setupSinkTasks(Configuration conf, int envTasks) {
+ // write task number, default same as execution env tasks
+ if (!conf.contains(FlinkOptions.WRITE_TASKS)) {
+ conf.setInteger(FlinkOptions.WRITE_TASKS, envTasks);
+ }
+ int writeTasks = conf.getInteger(FlinkOptions.WRITE_TASKS);
+ // bucket assign tasks, default same as write tasks
+ if (!conf.contains(FlinkOptions.BUCKET_ASSIGN_TASKS)) {
+ conf.setInteger(FlinkOptions.BUCKET_ASSIGN_TASKS, writeTasks);
+ }
+ // compaction tasks, default same as write tasks
+ if (!conf.contains(FlinkOptions.COMPACTION_TASKS)) {
+ conf.setInteger(FlinkOptions.COMPACTION_TASKS, writeTasks);
+ }
+ // clustering tasks, default same as write tasks
+ if (!conf.contains(FlinkOptions.CLUSTERING_TASKS)) {
+ conf.setInteger(FlinkOptions.CLUSTERING_TASKS, writeTasks);
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 0341d0af7f..18b27daeb9 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -209,9 +209,8 @@ public class Pipelines {
public static DataStream<HoodieRecord> bootstrap(
Configuration conf,
RowType rowType,
- int defaultParallelism,
DataStream<RowData> dataStream) {
- return bootstrap(conf, rowType, defaultParallelism, dataStream, false,
false);
+ return bootstrap(conf, rowType, dataStream, false, false);
}
/**
@@ -221,7 +220,6 @@ public class Pipelines {
*
* @param conf The configuration
* @param rowType The row type
- * @param defaultParallelism The default parallelism
* @param dataStream The data stream
* @param bounded Whether the source is bounded
* @param overwrite Whether it is insert overwrite
@@ -229,7 +227,6 @@ public class Pipelines {
public static DataStream<HoodieRecord> bootstrap(
Configuration conf,
RowType rowType,
- int defaultParallelism,
DataStream<RowData> dataStream,
boolean bounded,
boolean overwrite) {
@@ -237,16 +234,15 @@ public class Pipelines {
if (overwrite || OptionsResolver.isBucketIndexType(conf)) {
return rowDataToHoodieRecord(conf, rowType, dataStream);
} else if (bounded && !globalIndex &&
OptionsResolver.isPartitionedTable(conf)) {
- return boundedBootstrap(conf, rowType, defaultParallelism, dataStream);
+ return boundedBootstrap(conf, rowType, dataStream);
} else {
- return streamBootstrap(conf, rowType, defaultParallelism, dataStream,
bounded);
+ return streamBootstrap(conf, rowType, dataStream, bounded);
}
}
private static DataStream<HoodieRecord> streamBootstrap(
Configuration conf,
RowType rowType,
- int defaultParallelism,
DataStream<RowData> dataStream,
boolean bounded) {
DataStream<HoodieRecord> dataStream1 = rowDataToHoodieRecord(conf,
rowType, dataStream);
@@ -257,7 +253,7 @@ public class Pipelines {
"index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new BootstrapOperator<>(conf))
-
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
+
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream1.getParallelism()))
.uid("uid_index_bootstrap_" +
conf.getString(FlinkOptions.TABLE_NAME));
}
@@ -272,7 +268,6 @@ public class Pipelines {
private static DataStream<HoodieRecord> boundedBootstrap(
Configuration conf,
RowType rowType,
- int defaultParallelism,
DataStream<RowData> dataStream) {
final RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
// shuffle by partition keys
@@ -284,7 +279,7 @@ public class Pipelines {
"batch_index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new BatchBootstrapOperator<>(conf))
-
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
+
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream.getParallelism()))
.uid("uid_batch_index_bootstrap_" +
conf.getString(FlinkOptions.TABLE_NAME));
}
@@ -315,11 +310,10 @@ public class Pipelines {
* and flushes the data set to disk.
*
* @param conf The configuration
- * @param defaultParallelism The default parallelism
* @param dataStream The input data stream
* @return the stream write data stream pipeline
*/
- public static DataStream<Object> hoodieStreamWrite(Configuration conf, int
defaultParallelism, DataStream<HoodieRecord> dataStream) {
+ public static DataStream<Object> hoodieStreamWrite(Configuration conf,
DataStream<HoodieRecord> dataStream) {
if (OptionsResolver.isBucketIndexType(conf)) {
WriteOperatorFactory<HoodieRecord> operatorFactory =
BucketStreamWriteOperator.getFactory(conf);
int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
@@ -339,7 +333,7 @@ public class Pipelines {
TypeInformation.of(HoodieRecord.class),
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
.uid("uid_bucket_assigner_" +
conf.getString(FlinkOptions.TABLE_NAME))
-
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
+ .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
// shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform(opIdentifier("stream_write", conf),
TypeInformation.of(Object.class), operatorFactory)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index 29f55f78ac..b153b2273c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.utils.Pipelines;
@@ -76,7 +77,6 @@ public class HoodieFlinkStreamer {
.getLogicalType();
long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
- int parallelism = env.getParallelism();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
DataStream<RowData> dataStream = env.addSource(new FlinkKafkaConsumer<>(
@@ -98,8 +98,9 @@ public class HoodieFlinkStreamer {
}
}
- DataStream<HoodieRecord> hoodieRecordDataStream =
Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
- DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf,
parallelism, hoodieRecordDataStream);
+ OptionsInference.setupSinkTasks(conf, env.getParallelism());
+ DataStream<HoodieRecord> hoodieRecordDataStream =
Pipelines.bootstrap(conf, rowType, dataStream);
+ DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf,
hoodieRecordDataStream);
if (OptionsResolver.needsAsyncCompaction(conf)) {
Pipelines.compact(conf, pipeline);
} else {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 5af86867d8..f8799d3ac9 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -22,6 +22,7 @@ import org.apache.hudi.adapter.DataStreamSinkProviderAdapter;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.util.ChangelogModes;
@@ -66,6 +67,8 @@ public class HoodieTableSink implements DynamicTableSink,
SupportsPartitioning,
long ckpTimeout = dataStream.getExecutionEnvironment()
.getCheckpointConfig().getCheckpointTimeout();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+ // set up default parallelism
+ OptionsInference.setupSinkTasks(conf,
dataStream.getExecutionConfig().getParallelism());
RowType rowType = (RowType)
schema.toSinkRowDataType().notNull().getLogicalType();
@@ -85,14 +88,12 @@ public class HoodieTableSink implements DynamicTableSink,
SupportsPartitioning,
}
}
- // default parallelism
- int parallelism = dataStream.getExecutionConfig().getParallelism();
DataStream<Object> pipeline;
// bootstrap
final DataStream<HoodieRecord> hoodieRecordDataStream =
- Pipelines.bootstrap(conf, rowType, parallelism, dataStream,
context.isBounded(), overwrite);
+ Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(),
overwrite);
// write pipeline
- pipeline = Pipelines.hoodieStreamWrite(conf, parallelism,
hoodieRecordDataStream);
+ pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
// compaction
if (OptionsResolver.needsAsyncCompaction(conf)) {
// use synchronous compaction for bounded source.
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 2034cb322e..2deb33d842 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.source.FileIndex;
@@ -179,6 +180,7 @@ public class HoodieTableSource implements
@SuppressWarnings("unchecked")
TypeInformation<RowData> typeInfo =
(TypeInformation<RowData>)
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
+ OptionsInference.setupSourceTasks(conf, execEnv.getParallelism());
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
StreamReadMonitoringFunction monitoringFunction = new
StreamReadMonitoringFunction(
conf, FilePathUtils.toFlinkPath(path), tableRowType,
maxCompactionMemoryInBytes, getRequiredPartitionPaths());
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 680c4d02e2..aa420a433d 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.utils.Pipelines;
@@ -239,9 +240,9 @@ public class ITTestDataStreamWrite extends TestLogger {
dataStream = transformer.get().apply(dataStream);
}
- int parallelism = execEnv.getParallelism();
- DataStream<HoodieRecord> hoodieRecordDataStream =
Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
- DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf,
parallelism, hoodieRecordDataStream);
+ OptionsInference.setupSinkTasks(conf, execEnv.getParallelism());
+ DataStream<HoodieRecord> hoodieRecordDataStream =
Pipelines.bootstrap(conf, rowType, dataStream);
+ DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf,
hoodieRecordDataStream);
execEnv.addOperator(pipeline.getTransformation());
if (isMor) {
@@ -305,6 +306,7 @@ public class ITTestDataStreamWrite extends TestLogger {
.setParallelism(4);
}
+ OptionsInference.setupSinkTasks(conf, execEnv.getParallelism());
DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream,
true);
execEnv.addOperator(pipeline.getTransformation());
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
index aba8e4c7b4..a0073d8a37 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -88,7 +88,7 @@ public class ITTestHoodieFlinkClustering {
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
tableEnv.getConfig().getConfiguration()
-
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
@@ -187,7 +187,7 @@ public class ITTestHoodieFlinkClustering {
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
tableEnv.getConfig().getConfiguration()
-
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index a8b78ab64d..428f65f37c 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -101,7 +101,7 @@ public class ITTestHoodieFlinkCompactor {
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
tableEnv.getConfig().getConfiguration()
-
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
@@ -173,7 +173,7 @@ public class ITTestHoodieFlinkCompactor {
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
tableEnv.getConfig().getConfiguration()
-
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
@@ -214,7 +214,7 @@ public class ITTestHoodieFlinkCompactor {
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
tableEnv.getConfig().getConfiguration()
-
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
@@ -294,7 +294,7 @@ public class ITTestHoodieFlinkCompactor {
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
tableEnv.getConfig().getConfiguration()
-
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "2");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index bf3abf74b8..66af39e4c1 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -84,7 +84,7 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
streamTableEnv = TableEnvironmentImpl.create(settings);
streamTableEnv.getConfig().getConfiguration()
-
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
Configuration execConf = streamTableEnv.getConfig().getConfiguration();
execConf.setString("execution.checkpointing.interval", "2s");
// configure not to retry after failure
@@ -93,7 +93,7 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
batchTableEnv = TestTableEnvs.getBatchTableEnv();
batchTableEnv.getConfig().getConfiguration()
-
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
}
@TempDir