This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 6d5ef6da fix spark batch can't stop (#1927)
6d5ef6da is described below
commit 6d5ef6dabec2e24d524a737c389d868ec3a8eea9
Author: TrickyZerg <[email protected]>
AuthorDate: Fri May 20 10:11:35 2022 +0800
fix spark batch can't stop (#1927)
---
.../command/SeaTunnelApiTaskExecuteCommand.java | 38 ----------------------
.../spark/source/batch/BatchPartitionReader.java | 2 +-
2 files changed, 1 insertion(+), 39 deletions(-)
diff --git
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java
index a12d23b4..a0e30a7b 100644
---
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java
@@ -17,21 +17,12 @@
package org.apache.seatunnel.core.spark.command;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.base.command.Command;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
-import org.apache.seatunnel.core.base.config.EngineType;
-import org.apache.seatunnel.core.base.config.EnvironmentFactory;
import org.apache.seatunnel.core.base.exception.CommandExecuteException;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
import org.apache.seatunnel.core.spark.execution.SeaTunnelTaskExecution;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
-import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
-import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -66,33 +57,4 @@ public class SeaTunnelApiTaskExecuteCommand implements
Command<SparkCommandArgs>
}
}
- private SeaTunnelSource<?, ?, ?> getSource(Config config) {
- PluginIdentifier pluginIdentifier = getSourcePluginIdentifier();
- // todo: use FactoryUtils to load the plugin
- SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new
SeaTunnelSourcePluginDiscovery();
- return sourcePluginDiscovery.getPluginInstance(pluginIdentifier);
- }
-
- private SeaTunnelSink<?, ?, ?, ?> getSink(Config config) {
- PluginIdentifier pluginIdentifier = getSinkPluginIdentifier();
- SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new
SeaTunnelSinkPluginDiscovery();
- return sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
- }
-
- private PluginIdentifier getSourcePluginIdentifier() {
- return PluginIdentifier.of("seatunnel", "source", "FakeSource");
- }
-
- private PluginIdentifier getSinkPluginIdentifier() {
- return PluginIdentifier.of("seatunnel", "sink", "Console");
- }
-
- private SparkEnvironment getSparkEnvironment(Config config) {
- SparkEnvironment sparkEnvironment = (SparkEnvironment) new
EnvironmentFactory<>(config, EngineType.SPARK).getEnvironment();
- sparkEnvironment.setJobMode(JobMode.STREAMING);
- sparkEnvironment.setConfig(config);
- sparkEnvironment.prepare();
-
- return sparkEnvironment;
- }
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
index 4ed7cfb1..9f5beeb5 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
@@ -76,7 +76,7 @@ public class BatchPartitionReader implements
InputPartitionReader<InternalRow> {
throw new RuntimeException(e);
}
}
- return running;
+ return running || !handover.isEmpty();
}
protected void prepare() {