This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch 2.1.4-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/2.1.4-prepare by this push:
new abccc0648 [Hotfix][Connector-V1][Spark-Code] Fix the problem of
calling the getData() method twice (#3425)
abccc0648 is described below
commit abccc06489ce17ee859afa879c6f18650d26a508
Author: Kirs <[email protected]>
AuthorDate: Mon Nov 14 13:56:48 2022 +0800
[Hotfix][Connector-V1][Spark-Code] Fix the problem of calling the getData()
method twice (#3425)
Co-authored-by: xiaofu <[email protected]>
---
.../java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
index 890324a6b..cfabb85b8 100644
---
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
+++
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
@@ -28,6 +28,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.util.List;
+import java.util.stream.Collectors;
public class SparkBatchExecution implements Execution<SparkBatchSource,
BaseSparkTransform, SparkBatchSink, SparkEnvironment> {
@@ -42,10 +43,11 @@ public class SparkBatchExecution implements
Execution<SparkBatchSource, BaseSpar
@Override
public void start(List<SparkBatchSource> sources, List<BaseSparkTransform>
transforms, List<SparkBatchSink> sinks) {
- sources.forEach(source ->
SparkEnvironment.registerInputTempView(source, environment));
+ List<Dataset<Row>> sourceDatasets = sources.stream().map(source ->
SparkEnvironment.registerInputTempView(source, environment))
+ .collect(Collectors.toList());
if (!sources.isEmpty()) {
- Dataset<Row> ds = sources.get(0).getData(environment);
+ Dataset<Row> ds = sourceDatasets.get(0);
for (BaseSparkTransform transform : transforms) {
ds = SparkEnvironment.transformProcess(environment, transform,
ds);
SparkEnvironment.registerTransformTempView(transform, ds);