This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch 2.1.4-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/2.1.4-prepare by this push:
     new abccc0648 [Hotfix][Connector-V1][Spark-Code] Fix the problem of 
calling the getData() method twice (#3425)
abccc0648 is described below

commit abccc06489ce17ee859afa879c6f18650d26a508
Author: Kirs <[email protected]>
AuthorDate: Mon Nov 14 13:56:48 2022 +0800

    [Hotfix][Connector-V1][Spark-Code] Fix the problem of calling the getData() 
method twice (#3425)
    
    Co-authored-by: xiaofu <[email protected]>
---
 .../java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java  | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
 
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
index 890324a6b..cfabb85b8 100644
--- 
a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
+++ 
b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
@@ -28,6 +28,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class SparkBatchExecution implements Execution<SparkBatchSource, 
BaseSparkTransform, SparkBatchSink, SparkEnvironment> {
 
@@ -42,10 +43,11 @@ public class SparkBatchExecution implements 
Execution<SparkBatchSource, BaseSpar
     @Override
     public void start(List<SparkBatchSource> sources, List<BaseSparkTransform> 
transforms, List<SparkBatchSink> sinks) {
 
-        sources.forEach(source -> 
SparkEnvironment.registerInputTempView(source, environment));
+        List<Dataset<Row>> sourceDatasets = sources.stream().map(source -> 
SparkEnvironment.registerInputTempView(source, environment))
+                .collect(Collectors.toList());
 
         if (!sources.isEmpty()) {
-            Dataset<Row> ds = sources.get(0).getData(environment);
+            Dataset<Row> ds = sourceDatasets.get(0);
             for (BaseSparkTransform transform : transforms) {
                 ds = SparkEnvironment.transformProcess(environment, transform, 
ds);
                 SparkEnvironment.registerTransformTempView(transform, ds);

Reply via email to