This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 35e896eb6 [Hotfix][core] Fix spark engine parallelism parameter does 
not working (#2965)
35e896eb6 is described below

commit 35e896eb67b23e450d2ac49f6a891f39cc7826bd
Author: TyrantLucifer <[email protected]>
AuthorDate: Wed Oct 5 13:50:04 2022 +0800

    [Hotfix][core] Fix spark engine parallelism parameter does not working 
(#2965)
    
    * [Hotfix][core] Fix spark engine parallelism parameter does not working
---
 .../main/java/org/apache/seatunnel/spark/SparkEnvironment.java    | 8 +++++++-
 .../src/main/java/org/apache/seatunnel/common/Constants.java      | 2 +-
 .../core/starter/spark/execution/SourceExecuteProcessor.java      | 8 ++++++++
 3 files changed, 16 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
 
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
index 3e2d5e91b..b9758babe 100644
--- 
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
+++ 
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
@@ -46,6 +46,8 @@ public class SparkEnvironment implements RuntimeEnv {
 
     private static final long DEFAULT_SPARK_STREAMING_DURATION = 5;
 
+    private SparkConf sparkConf;
+
     private SparkSession sparkSession;
 
     private StreamingContext streamingContext;
@@ -98,7 +100,7 @@ public class SparkEnvironment implements RuntimeEnv {
 
     @Override
     public SparkEnvironment prepare() {
-        SparkConf sparkConf = createSparkConf();
+        sparkConf = createSparkConf();
         SparkSession.Builder builder = 
SparkSession.builder().config(sparkConf);
         if (enableHive) {
             builder.enableHiveSupport();
@@ -116,6 +118,10 @@ public class SparkEnvironment implements RuntimeEnv {
         return this.streamingContext;
     }
 
+    public SparkConf getSparkConf() {
+        return this.sparkConf;
+    }
+
     private SparkConf createSparkConf() {
         SparkConf sparkConf = new SparkConf();
         this.config.entrySet().forEach(entry -> sparkConf.set(entry.getKey(), 
String.valueOf(entry.getValue().unwrapped())));
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
index 8e700d89a..ad9f8e222 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
@@ -31,7 +31,7 @@ public final class Constants {
 
     public static final String SOURCE_SERIALIZATION = "source.serialization";
 
-    public static final String SOURCE_PARALLELISM = "source.parallelism";
+    public static final String SOURCE_PARALLELISM = "parallelism";
 
     public static final String HDFS_ROOT = "hdfs.root";
 
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index f6eafe67c..eda672914 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -54,9 +54,17 @@ public class SourceExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTu
         List<Dataset<Row>> sources = new ArrayList<>();
         for (int i = 0; i < plugins.size(); i++) {
             SeaTunnelSource<?, ?, ?> source = plugins.get(i);
+            Config pluginConfig = pluginConfigs.get(i);
+            int parallelism;
+            if (pluginConfig.hasPath(Constants.SOURCE_PARALLELISM)) {
+                parallelism = 
pluginConfig.getInt(Constants.SOURCE_PARALLELISM);
+            } else {
+                parallelism = 
sparkEnvironment.getSparkConf().getInt(Constants.SOURCE_PARALLELISM, 1);
+            }
             Dataset<Row> dataset = sparkEnvironment.getSparkSession()
                 .read()
                 .format(SeaTunnelSource.class.getSimpleName())
+                .option(Constants.SOURCE_PARALLELISM, parallelism)
                 .option(Constants.SOURCE_SERIALIZATION, 
SerializationUtils.objectToString(source))
                 .schema((StructType) 
TypeConverterUtils.convert(source.getProducedType())).load();
             sources.add(dataset);

Reply via email to