This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 35e896eb6 [Hotfix][core] Fix spark engine parallelism parameter does
not working (#2965)
35e896eb6 is described below
commit 35e896eb67b23e450d2ac49f6a891f39cc7826bd
Author: TyrantLucifer <[email protected]>
AuthorDate: Wed Oct 5 13:50:04 2022 +0800
[Hotfix][core] Fix spark engine parallelism parameter does not working
(#2965)
* [Hotfix][core] Fix spark engine parallelism parameter does not working
---
.../main/java/org/apache/seatunnel/spark/SparkEnvironment.java | 8 +++++++-
.../src/main/java/org/apache/seatunnel/common/Constants.java | 2 +-
.../core/starter/spark/execution/SourceExecuteProcessor.java | 8 ++++++++
3 files changed, 16 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
index 3e2d5e91b..b9758babe 100644
---
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
+++
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
@@ -46,6 +46,8 @@ public class SparkEnvironment implements RuntimeEnv {
private static final long DEFAULT_SPARK_STREAMING_DURATION = 5;
+ private SparkConf sparkConf;
+
private SparkSession sparkSession;
private StreamingContext streamingContext;
@@ -98,7 +100,7 @@ public class SparkEnvironment implements RuntimeEnv {
@Override
public SparkEnvironment prepare() {
- SparkConf sparkConf = createSparkConf();
+ sparkConf = createSparkConf();
SparkSession.Builder builder =
SparkSession.builder().config(sparkConf);
if (enableHive) {
builder.enableHiveSupport();
@@ -116,6 +118,10 @@ public class SparkEnvironment implements RuntimeEnv {
return this.streamingContext;
}
+ public SparkConf getSparkConf() {
+ return this.sparkConf;
+ }
+
private SparkConf createSparkConf() {
SparkConf sparkConf = new SparkConf();
this.config.entrySet().forEach(entry -> sparkConf.set(entry.getKey(),
String.valueOf(entry.getValue().unwrapped())));
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
index 8e700d89a..ad9f8e222 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
@@ -31,7 +31,7 @@ public final class Constants {
public static final String SOURCE_SERIALIZATION = "source.serialization";
- public static final String SOURCE_PARALLELISM = "source.parallelism";
+ public static final String SOURCE_PARALLELISM = "parallelism";
public static final String HDFS_ROOT = "hdfs.root";
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index f6eafe67c..eda672914 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -54,9 +54,17 @@ public class SourceExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTu
List<Dataset<Row>> sources = new ArrayList<>();
for (int i = 0; i < plugins.size(); i++) {
SeaTunnelSource<?, ?, ?> source = plugins.get(i);
+ Config pluginConfig = pluginConfigs.get(i);
+ int parallelism;
+ if (pluginConfig.hasPath(Constants.SOURCE_PARALLELISM)) {
+ parallelism =
pluginConfig.getInt(Constants.SOURCE_PARALLELISM);
+ } else {
+ parallelism =
sparkEnvironment.getSparkConf().getInt(Constants.SOURCE_PARALLELISM, 1);
+ }
Dataset<Row> dataset = sparkEnvironment.getSparkSession()
.read()
.format(SeaTunnelSource.class.getSimpleName())
+ .option(Constants.SOURCE_PARALLELISM, parallelism)
.option(Constants.SOURCE_SERIALIZATION,
SerializationUtils.objectToString(source))
.schema((StructType)
TypeConverterUtils.convert(source.getProducedType())).load();
sources.add(dataset);