This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new efd9dab14c [HUDI-4551] Tweak the default parallelism of flink pipeline 
to execution env  parallelism (#6312)
efd9dab14c is described below

commit efd9dab14cdea9ec2b09d79499fa95d5584df4fa
Author: Nicholas Jiang <[email protected]>
AuthorDate: Thu Aug 18 16:13:28 2022 +0800

    [HUDI-4551] Tweak the default parallelism of flink pipeline to execution 
env  parallelism (#6312)
---
 .../apache/hudi/configuration/FlinkOptions.java    | 20 +++----
 .../hudi/configuration/OptionsInference.java       | 65 ++++++++++++++++++++++
 .../java/org/apache/hudi/sink/utils/Pipelines.java | 20 +++----
 .../apache/hudi/streamer/HoodieFlinkStreamer.java  |  7 ++-
 .../org/apache/hudi/table/HoodieTableSink.java     |  9 +--
 .../org/apache/hudi/table/HoodieTableSource.java   |  2 +
 .../apache/hudi/sink/ITTestDataStreamWrite.java    |  8 ++-
 .../sink/cluster/ITTestHoodieFlinkClustering.java  |  4 +-
 .../sink/compact/ITTestHoodieFlinkCompactor.java   |  8 +--
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  4 +-
 10 files changed, 106 insertions(+), 41 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 3638113288..7b78fb8d6a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -153,8 +153,8 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<Integer> READ_TASKS = ConfigOptions
       .key("read.tasks")
       .intType()
-      .defaultValue(4)
-      .withDescription("Parallelism of tasks that do actual read, default is 
4");
+      .noDefaultValue()
+      .withDescription("Parallelism of tasks that do actual read, default is 
the parallelism of the execution environment");
 
   public static final ConfigOption<String> SOURCE_AVRO_SCHEMA_PATH = 
ConfigOptions
       .key("source.avro-schema.path")
@@ -395,19 +395,19 @@ public class FlinkOptions extends HoodieConfig {
       .key("write.index_bootstrap.tasks")
       .intType()
       .noDefaultValue()
-      .withDescription("Parallelism of tasks that do index bootstrap, default 
is the parallelism of the execution environment");
+      .withDescription("Parallelism of tasks that do index bootstrap, default 
same as the sink parallelism");
 
   public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS = ConfigOptions
       .key("write.bucket_assign.tasks")
       .intType()
       .noDefaultValue()
-      .withDescription("Parallelism of tasks that do bucket assign, default is 
the parallelism of the execution environment");
+      .withDescription("Parallelism of tasks that do bucket assign, default 
same as the write task parallelism");
 
   public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions
       .key("write.tasks")
       .intType()
-      .defaultValue(4)
-      .withDescription("Parallelism of tasks that do actual write, default is 
4");
+      .noDefaultValue()
+      .withDescription("Parallelism of tasks that do actual write, default is 
the parallelism of the execution environment");
 
   public static final ConfigOption<Double> WRITE_TASK_MAX_SIZE = ConfigOptions
       .key("write.task.max.size")
@@ -512,8 +512,8 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<Integer> COMPACTION_TASKS = ConfigOptions
       .key("compaction.tasks")
       .intType()
-      .defaultValue(4) // default WRITE_TASKS * COMPACTION_DELTA_COMMITS * 0.2 
(assumes 5 commits generate one bucket)
-      .withDescription("Parallelism of tasks that do actual compaction, 
default is 4");
+      .noDefaultValue()
+      .withDescription("Parallelism of tasks that do actual compaction, 
default same as the write task parallelism");
 
   public static final String NUM_COMMITS = "num_commits";
   public static final String TIME_ELAPSED = "time_elapsed";
@@ -630,8 +630,8 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<Integer> CLUSTERING_TASKS = ConfigOptions
       .key("clustering.tasks")
       .intType()
-      .defaultValue(4)
-      .withDescription("Parallelism of tasks that do actual clustering, 
default is 4");
+      .noDefaultValue()
+      .withDescription("Parallelism of tasks that do actual clustering, 
default same as the write task parallelism");
 
   public static final ConfigOption<Integer> CLUSTERING_TARGET_PARTITIONS = 
ConfigOptions
       .key("clustering.plan.strategy.daybased.lookback.partitions")
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
new file mode 100644
index 0000000000..3e02d23732
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.configuration;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Tool helping to infer the flink options {@link FlinkOptions}.
+ */
+public class OptionsInference {
+
+  /**
+   * Sets up the default source task parallelism if it is not specified.
+   *
+   * @param conf     The configuration
+   * @param envTasks The parallelism of the execution env
+   */
+  public static void setupSourceTasks(Configuration conf, int envTasks) {
+    if (!conf.contains(FlinkOptions.READ_TASKS)) {
+      conf.setInteger(FlinkOptions.READ_TASKS, envTasks);
+    }
+  }
+
+  /**
+   * Sets up the default sink tasks parallelism if it is not specified.
+   *
+   * @param conf     The configuration
+   * @param envTasks The parallelism of the execution env
+   */
+  public static void setupSinkTasks(Configuration conf, int envTasks) {
+    // write task number, default same as execution env tasks
+    if (!conf.contains(FlinkOptions.WRITE_TASKS)) {
+      conf.setInteger(FlinkOptions.WRITE_TASKS, envTasks);
+    }
+    int writeTasks = conf.getInteger(FlinkOptions.WRITE_TASKS);
+    // bucket assign tasks, default same as write tasks
+    if (!conf.contains(FlinkOptions.BUCKET_ASSIGN_TASKS)) {
+      conf.setInteger(FlinkOptions.BUCKET_ASSIGN_TASKS, writeTasks);
+    }
+    // compaction tasks, default same as write tasks
+    if (!conf.contains(FlinkOptions.COMPACTION_TASKS)) {
+      conf.setInteger(FlinkOptions.COMPACTION_TASKS, writeTasks);
+    }
+    // clustering tasks, default same as write tasks
+    if (!conf.contains(FlinkOptions.CLUSTERING_TASKS)) {
+      conf.setInteger(FlinkOptions.CLUSTERING_TASKS, writeTasks);
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 0341d0af7f..18b27daeb9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -209,9 +209,8 @@ public class Pipelines {
   public static DataStream<HoodieRecord> bootstrap(
       Configuration conf,
       RowType rowType,
-      int defaultParallelism,
       DataStream<RowData> dataStream) {
-    return bootstrap(conf, rowType, defaultParallelism, dataStream, false, 
false);
+    return bootstrap(conf, rowType, dataStream, false, false);
   }
 
   /**
@@ -221,7 +220,6 @@ public class Pipelines {
    *
    * @param conf               The configuration
    * @param rowType            The row type
-   * @param defaultParallelism The default parallelism
    * @param dataStream         The data stream
    * @param bounded            Whether the source is bounded
    * @param overwrite          Whether it is insert overwrite
@@ -229,7 +227,6 @@ public class Pipelines {
   public static DataStream<HoodieRecord> bootstrap(
       Configuration conf,
       RowType rowType,
-      int defaultParallelism,
       DataStream<RowData> dataStream,
       boolean bounded,
       boolean overwrite) {
@@ -237,16 +234,15 @@ public class Pipelines {
     if (overwrite || OptionsResolver.isBucketIndexType(conf)) {
       return rowDataToHoodieRecord(conf, rowType, dataStream);
     } else if (bounded && !globalIndex && 
OptionsResolver.isPartitionedTable(conf)) {
-      return boundedBootstrap(conf, rowType, defaultParallelism, dataStream);
+      return boundedBootstrap(conf, rowType, dataStream);
     } else {
-      return streamBootstrap(conf, rowType, defaultParallelism, dataStream, 
bounded);
+      return streamBootstrap(conf, rowType, dataStream, bounded);
     }
   }
 
   private static DataStream<HoodieRecord> streamBootstrap(
       Configuration conf,
       RowType rowType,
-      int defaultParallelism,
       DataStream<RowData> dataStream,
       boolean bounded) {
     DataStream<HoodieRecord> dataStream1 = rowDataToHoodieRecord(conf, 
rowType, dataStream);
@@ -257,7 +253,7 @@ public class Pipelines {
               "index_bootstrap",
               TypeInformation.of(HoodieRecord.class),
               new BootstrapOperator<>(conf))
-          
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
+          
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream1.getParallelism()))
           .uid("uid_index_bootstrap_" + 
conf.getString(FlinkOptions.TABLE_NAME));
     }
 
@@ -272,7 +268,6 @@ public class Pipelines {
   private static DataStream<HoodieRecord> boundedBootstrap(
       Configuration conf,
       RowType rowType,
-      int defaultParallelism,
       DataStream<RowData> dataStream) {
     final RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
     // shuffle by partition keys
@@ -284,7 +279,7 @@ public class Pipelines {
             "batch_index_bootstrap",
             TypeInformation.of(HoodieRecord.class),
             new BatchBootstrapOperator<>(conf))
-        
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
+        
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream.getParallelism()))
         .uid("uid_batch_index_bootstrap_" + 
conf.getString(FlinkOptions.TABLE_NAME));
   }
 
@@ -315,11 +310,10 @@ public class Pipelines {
    * and flushes the data set to disk.
    *
    * @param conf               The configuration
-   * @param defaultParallelism The default parallelism
    * @param dataStream         The input data stream
    * @return the stream write data stream pipeline
    */
-  public static DataStream<Object> hoodieStreamWrite(Configuration conf, int 
defaultParallelism, DataStream<HoodieRecord> dataStream) {
+  public static DataStream<Object> hoodieStreamWrite(Configuration conf, 
DataStream<HoodieRecord> dataStream) {
     if (OptionsResolver.isBucketIndexType(conf)) {
       WriteOperatorFactory<HoodieRecord> operatorFactory = 
BucketStreamWriteOperator.getFactory(conf);
       int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
@@ -339,7 +333,7 @@ public class Pipelines {
               TypeInformation.of(HoodieRecord.class),
               new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
           .uid("uid_bucket_assigner_" + 
conf.getString(FlinkOptions.TABLE_NAME))
-          
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
+          .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
           // shuffle by fileId(bucket id)
           .keyBy(record -> record.getCurrentLocation().getFileId())
           .transform(opIdentifier("stream_write", conf), 
TypeInformation.of(Object.class), operatorFactory)
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index 29f55f78ac..b153b2273c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsInference;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.sink.transform.Transformer;
 import org.apache.hudi.sink.utils.Pipelines;
@@ -76,7 +77,6 @@ public class HoodieFlinkStreamer {
             .getLogicalType();
 
     long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
-    int parallelism = env.getParallelism();
     conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
 
     DataStream<RowData> dataStream = env.addSource(new FlinkKafkaConsumer<>(
@@ -98,8 +98,9 @@ public class HoodieFlinkStreamer {
       }
     }
 
-    DataStream<HoodieRecord> hoodieRecordDataStream = 
Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
-    DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, 
parallelism, hoodieRecordDataStream);
+    OptionsInference.setupSinkTasks(conf, env.getParallelism());
+    DataStream<HoodieRecord> hoodieRecordDataStream = 
Pipelines.bootstrap(conf, rowType, dataStream);
+    DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, 
hoodieRecordDataStream);
     if (OptionsResolver.needsAsyncCompaction(conf)) {
       Pipelines.compact(conf, pipeline);
     } else {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 5af86867d8..f8799d3ac9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -22,6 +22,7 @@ import org.apache.hudi.adapter.DataStreamSinkProviderAdapter;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsInference;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.sink.utils.Pipelines;
 import org.apache.hudi.util.ChangelogModes;
@@ -66,6 +67,8 @@ public class HoodieTableSink implements DynamicTableSink, 
SupportsPartitioning,
       long ckpTimeout = dataStream.getExecutionEnvironment()
           .getCheckpointConfig().getCheckpointTimeout();
       conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+      // set up default parallelism
+      OptionsInference.setupSinkTasks(conf, 
dataStream.getExecutionConfig().getParallelism());
 
       RowType rowType = (RowType) 
schema.toSinkRowDataType().notNull().getLogicalType();
 
@@ -85,14 +88,12 @@ public class HoodieTableSink implements DynamicTableSink, 
SupportsPartitioning,
         }
       }
 
-      // default parallelism
-      int parallelism = dataStream.getExecutionConfig().getParallelism();
       DataStream<Object> pipeline;
       // bootstrap
       final DataStream<HoodieRecord> hoodieRecordDataStream =
-          Pipelines.bootstrap(conf, rowType, parallelism, dataStream, 
context.isBounded(), overwrite);
+          Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), 
overwrite);
       // write pipeline
-      pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, 
hoodieRecordDataStream);
+      pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
       // compaction
       if (OptionsResolver.needsAsyncCompaction(conf)) {
         // use synchronous compaction for bounded source.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 2034cb322e..2deb33d842 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.configuration.OptionsInference;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.source.FileIndex;
@@ -179,6 +180,7 @@ public class HoodieTableSource implements
         @SuppressWarnings("unchecked")
         TypeInformation<RowData> typeInfo =
             (TypeInformation<RowData>) 
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
+        OptionsInference.setupSourceTasks(conf, execEnv.getParallelism());
         if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
           StreamReadMonitoringFunction monitoringFunction = new 
StreamReadMonitoringFunction(
               conf, FilePathUtils.toFlinkPath(path), tableRowType, 
maxCompactionMemoryInBytes, getRequiredPartitionPaths());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 680c4d02e2..aa420a433d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsInference;
 import org.apache.hudi.sink.transform.ChainedTransformer;
 import org.apache.hudi.sink.transform.Transformer;
 import org.apache.hudi.sink.utils.Pipelines;
@@ -239,9 +240,9 @@ public class ITTestDataStreamWrite extends TestLogger {
       dataStream = transformer.get().apply(dataStream);
     }
 
-    int parallelism = execEnv.getParallelism();
-    DataStream<HoodieRecord> hoodieRecordDataStream = 
Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
-    DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, 
parallelism, hoodieRecordDataStream);
+    OptionsInference.setupSinkTasks(conf, execEnv.getParallelism());
+    DataStream<HoodieRecord> hoodieRecordDataStream = 
Pipelines.bootstrap(conf, rowType, dataStream);
+    DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, 
hoodieRecordDataStream);
     execEnv.addOperator(pipeline.getTransformation());
 
     if (isMor) {
@@ -305,6 +306,7 @@ public class ITTestDataStreamWrite extends TestLogger {
           .setParallelism(4);
     }
 
+    OptionsInference.setupSinkTasks(conf, execEnv.getParallelism());
     DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, 
true);
     execEnv.addOperator(pipeline.getTransformation());
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
index aba8e4c7b4..a0073d8a37 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -88,7 +88,7 @@ public class ITTestHoodieFlinkClustering {
     EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
     TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
     tableEnv.getConfig().getConfiguration()
-        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
 
@@ -187,7 +187,7 @@ public class ITTestHoodieFlinkClustering {
     EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
     TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
     tableEnv.getConfig().getConfiguration()
-        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index a8b78ab64d..428f65f37c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -101,7 +101,7 @@ public class ITTestHoodieFlinkCompactor {
     EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
     TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
     tableEnv.getConfig().getConfiguration()
-        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
     options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
@@ -173,7 +173,7 @@ public class ITTestHoodieFlinkCompactor {
     EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
     TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
     tableEnv.getConfig().getConfiguration()
-        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
@@ -214,7 +214,7 @@ public class ITTestHoodieFlinkCompactor {
     EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
     TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
     tableEnv.getConfig().getConfiguration()
-        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
@@ -294,7 +294,7 @@ public class ITTestHoodieFlinkCompactor {
     EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
     TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
     tableEnv.getConfig().getConfiguration()
-            
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+            
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "2");
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index bf3abf74b8..66af39e4c1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -84,7 +84,7 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
     EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
     streamTableEnv = TableEnvironmentImpl.create(settings);
     streamTableEnv.getConfig().getConfiguration()
-        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
     Configuration execConf = streamTableEnv.getConfig().getConfiguration();
     execConf.setString("execution.checkpointing.interval", "2s");
     // configure not to retry after failure
@@ -93,7 +93,7 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
 
     batchTableEnv = TestTableEnvs.getBatchTableEnv();
     batchTableEnv.getConfig().getConfiguration()
-        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
   }
 
   @TempDir

Reply via email to