This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new bafd8e29ad [Fix][Spark] Fix source parallelism not working with Spark 
engine (#9319)
bafd8e29ad is described below

commit bafd8e29ad9d727c0c9f4c93e7c5af1ab0dd6c89
Author: Junxin Xiao <[email protected]>
AuthorDate: Wed May 28 21:22:09 2025 +0800

    [Fix][Spark] Fix source parallelism not working with Spark engine (#9319)
---
 .../core/starter/spark/SparkCommandArgsTest.java   | 33 +++++++++++++++
 .../resources/config/source_parallelism_set_2.conf | 48 ++++++++++++++++++++++
 .../spark/execution/SourceExecuteProcessor.java    |  2 +-
 3 files changed, 82 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/java/org/apache/seatunnel/core/starter/spark/SparkCommandArgsTest.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/java/org/apache/seatunnel/core/starter/spark/SparkCommandArgsTest.java
index d5ef0c9da3..0223087e39 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/java/org/apache/seatunnel/core/starter/spark/SparkCommandArgsTest.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/java/org/apache/seatunnel/core/starter/spark/SparkCommandArgsTest.java
@@ -17,17 +17,28 @@
 
 package org.apache.seatunnel.core.starter.spark;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.core.starter.SeaTunnel;
 import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
+import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
 import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
+import 
org.apache.seatunnel.core.starter.spark.execution.SourceExecuteProcessor;
+import 
org.apache.seatunnel.core.starter.spark.execution.SparkRuntimeEnvironment;
 import org.apache.seatunnel.core.starter.spark.multitable.MultiTableSinkTest;
+import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
+import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.io.FileNotFoundException;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
 
 import static 
org.apache.seatunnel.api.options.ConnectorCommonOptions.PLUGIN_NAME;
 
@@ -58,6 +69,28 @@ public class SparkCommandArgsTest {
                 commandExecuteException.getCause().getMessage());
     }
 
+    @Test
+    public void testSourceParallelismConfigWorkAndOverrideEnvConfig()
+            throws FileNotFoundException, URISyntaxException {
+        String configurePath = "/config/source_parallelism_set_2.conf";
+        String configFile = 
MultiTableSinkTest.getTestConfigFile(configurePath);
+        SparkCommandArgs sparkCommandArgs = buildSparkCommands(configFile);
+        sparkCommandArgs.setDeployMode(DeployMode.CLIENT);
+        Config config = ConfigBuilder.of(configFile, 
sparkCommandArgs.getVariables());
+        SparkRuntimeEnvironment sparkRuntimeEnvironment =
+                SparkRuntimeEnvironment.getInstance(config);
+        JobContext jobContext = new JobContext();
+        jobContext.setJobMode(RuntimeEnvironment.getJobMode(config));
+        SourceExecuteProcessor processor =
+                new SourceExecuteProcessor(
+                        sparkRuntimeEnvironment,
+                        jobContext,
+                        config.getConfigList(Constants.SOURCE));
+        List<DatasetTableInfo> datasets = new ArrayList<>();
+        List<DatasetTableInfo> result = processor.execute(datasets);
+        Assertions.assertEquals(2, 
result.get(0).getDataset().rdd().getNumPartitions());
+    }
+
     private static SparkCommandArgs buildSparkCommands(String configFile) {
         SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
         sparkCommandArgs.setConfigFile(configFile);
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/source_parallelism_set_2.conf
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/source_parallelism_set_2.conf
new file mode 100644
index 0000000000..40c474357c
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/source_parallelism_set_2.conf
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config is for testing that source parallelism setting is work and 
overrides env parallelism.
+######
+
+env {
+  parallelism = 5
+  job.mode = "BATCH"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    plugin_output = "test_source_parallelism"
+    parallelism = 2
+    plugin_name = "FakeSource"
+    schema = {
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  InMemory {
+    plugin_input = "test_source_parallelism"
+    plugin_name = "InMemory"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index 9763829ec7..77fdda26fc 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -92,12 +92,12 @@ public class SourceExecuteProcessor extends 
SparkAbstractPluginExecuteProcessor<
                                         EnvCommonOptions.PARALLELISM.key(),
                                         
EnvCommonOptions.PARALLELISM.defaultValue());
             }
+            envOption.put(EnvCommonOptions.PARALLELISM.key(), 
String.valueOf(parallelism));
             Dataset<Row> dataset =
                     sparkRuntimeEnvironment
                             .getSparkSession()
                             .read()
                             .format(SeaTunnelSource.class.getSimpleName())
-                            .option(EnvCommonOptions.PARALLELISM.key(), 
parallelism)
                             .option(
                                     Constants.SOURCE_SERIALIZATION,
                                     SerializationUtils.objectToString(source))

Reply via email to