This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6db9e970 [hotfix][core] check plugin type before execute (#1866)
6db9e970 is described below
commit 6db9e970dc559edded3d4dd30cd62ad069abbb29
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri May 13 11:11:56 2022 +0800
[hotfix][core] check plugin type before execute (#1866)
* check plugin type before execute
* Unified guave version
---
pom.xml | 7 +++
seatunnel-common/pom.xml | 4 ++
seatunnel-core/seatunnel-core-flink/pom.xml | 7 +++
.../flink/command/FlinkTaskExecuteCommand.java | 43 +++++++++++++++
.../flink/command/FlinkTaskExecuteCommandTest.java | 64 ++++++++++++++++++++++
.../spark/command/SparkTaskExecuteCommand.java | 48 ++++++++++++++++
seatunnel-dist/release-docs/LICENSE | 3 -
tools/dependencies/known-dependencies.txt | 2 -
8 files changed, 173 insertions(+), 5 deletions(-)
diff --git a/pom.xml b/pom.xml
index 29e92cb9..199fb63c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -170,6 +170,7 @@
<skipIT>true</skipIT>
<elasticsearch>7</elasticsearch>
<slf4j.version>1.7.25</slf4j.version>
+ <guava.version>19.0</guava.version>
</properties>
<dependencyManagement>
@@ -596,6 +597,12 @@
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/seatunnel-common/pom.xml b/seatunnel-common/pom.xml
index c6033308..cd8db778 100644
--- a/seatunnel-common/pom.xml
+++ b/seatunnel-common/pom.xml
@@ -48,6 +48,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
<dependency>
<groupId>junit</groupId>
diff --git a/seatunnel-core/seatunnel-core-flink/pom.xml
b/seatunnel-core/seatunnel-core-flink/pom.xml
index 58df75c4..bee00a1c 100644
--- a/seatunnel-core/seatunnel-core-flink/pom.xml
+++ b/seatunnel-core/seatunnel-core-flink/pom.xml
@@ -78,10 +78,17 @@
<version>${project.version}</version>
</dependency>
+
+ <!-- test dependency -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
diff --git
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommand.java
index 32d13440..612d668e 100644
---
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommand.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.apis.base.api.BaseSink;
import org.apache.seatunnel.apis.base.api.BaseSource;
import org.apache.seatunnel.apis.base.api.BaseTransform;
import org.apache.seatunnel.apis.base.env.Execution;
+import org.apache.seatunnel.apis.base.plugin.Plugin;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.base.command.BaseTaskExecuteCommand;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
import org.apache.seatunnel.core.base.config.EngineType;
@@ -29,11 +31,21 @@ import
org.apache.seatunnel.core.base.config.ExecutionFactory;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.batch.FlinkBatchTransform;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSource;
+import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import com.google.common.annotations.VisibleForTesting;
+
import java.nio.file.Path;
+import java.util.Arrays;
import java.util.List;
+import java.util.stream.Stream;
/**
* Used to execute Flink Job.
@@ -57,6 +69,7 @@ public class FlinkTaskExecuteCommand extends
BaseTaskExecuteCommand<FlinkCommand
List<BaseTransform<FlinkEnvironment>> transforms =
executionContext.getTransforms();
List<BaseSink<FlinkEnvironment>> sinks = executionContext.getSinks();
+ checkPluginType(executionContext.getJobMode(), sources, transforms,
sinks);
baseCheckConfig(sinks, transforms, sinks);
showAsciiLogo();
@@ -72,4 +85,34 @@ public class FlinkTaskExecuteCommand extends
BaseTaskExecuteCommand<FlinkCommand
}
}
+ @VisibleForTesting
+ @SuppressWarnings("unchecked")
+ void checkPluginType(JobMode jobMode, List<? extends
Plugin<FlinkEnvironment>>... plugins) {
+ Stream<? extends Plugin<?>> pluginStream =
Arrays.stream(plugins).flatMap(List::stream);
+ switch (jobMode) {
+ case STREAMING:
+ pluginStream.forEach(plugin -> {
+ boolean isStream = (plugin instanceof FlinkStreamSource)
+ || (plugin instanceof FlinkStreamTransform)
+ || (plugin instanceof FlinkStreamSink);
+ if (!isStream) {
+ throw new
IllegalArgumentException(String.format("Cannot use batch plugin: %s in stream
mode", plugin.getPluginName()));
+ }
+ });
+ break;
+ case BATCH:
+ pluginStream.forEach(plugin -> {
+ boolean isBatch = (plugin instanceof FlinkBatchSource)
+ || (plugin instanceof FlinkBatchTransform)
+ || (plugin instanceof FlinkBatchSink);
+ if (!isBatch) {
+ throw new
IllegalArgumentException(String.format("Cannot use stream plugin: %s in batch
mode", plugin.getPluginName()));
+ }
+ });
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported job mode: " +
jobMode);
+ }
+ }
+
}
diff --git
a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommandTest.java
b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommandTest.java
new file mode 100644
index 00000000..fc8b3edf
--- /dev/null
+++
b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommandTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.command;
+
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.types.Row;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class FlinkTaskExecuteCommandTest {
+
+ @Test
+ public void checkPluginType() {
+ List<MockBatchSource> sources = Lists.newArrayList(new
MockBatchSource());
+ FlinkTaskExecuteCommand flinkTaskExecuteCommand = new
FlinkTaskExecuteCommand(null);
+ // check success
+ flinkTaskExecuteCommand.checkPluginType(JobMode.BATCH, sources);
+ Assert.assertThrows("checkPluginType should throw IllegalException",
IllegalArgumentException.class, () -> {
+ flinkTaskExecuteCommand.checkPluginType(JobMode.STREAMING,
sources);
+ });
+ }
+
+ private static class MockBatchSource implements FlinkBatchSource {
+
+ @Override
+ public void setConfig(Config config) {
+
+ }
+
+ @Override
+ public Config getConfig() {
+ return null;
+ }
+
+ @Override
+ public DataSet<Row> getData(FlinkEnvironment env) {
+ return null;
+ }
+ }
+}
diff --git
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
index c8aebd4b..08eb4f2e 100644
---
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.apis.base.api.BaseSink;
import org.apache.seatunnel.apis.base.api.BaseSource;
import org.apache.seatunnel.apis.base.api.BaseTransform;
import org.apache.seatunnel.apis.base.env.Execution;
+import org.apache.seatunnel.apis.base.plugin.Plugin;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.base.command.BaseTaskExecuteCommand;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
import org.apache.seatunnel.core.base.config.EngineType;
@@ -29,11 +31,19 @@ import
org.apache.seatunnel.core.base.config.ExecutionFactory;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
import org.apache.seatunnel.spark.SparkEnvironment;
+import org.apache.seatunnel.spark.batch.SparkBatchSink;
+import org.apache.seatunnel.spark.batch.SparkBatchSource;
+import org.apache.seatunnel.spark.stream.SparkStreamingSink;
+import org.apache.seatunnel.spark.stream.SparkStreamingSource;
+import org.apache.seatunnel.spark.structuredstream.StructuredStreamingSink;
+import org.apache.seatunnel.spark.structuredstream.StructuredStreamingSource;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import java.nio.file.Path;
+import java.util.Arrays;
import java.util.List;
+import java.util.stream.Stream;
public class SparkTaskExecuteCommand extends
BaseTaskExecuteCommand<SparkCommandArgs, SparkEnvironment> {
@@ -70,4 +80,42 @@ public class SparkTaskExecuteCommand extends
BaseTaskExecuteCommand<SparkCommand
}
}
+ private void checkPluginType(JobMode jobMode, List<? extends Plugin<?>>...
plugins) {
+ Stream<? extends Plugin<?>> pluginStream =
Arrays.stream(plugins).flatMap(List::stream);
+ switch (jobMode) {
+ case STREAMING:
+ pluginStream.forEach(plugin -> {
+ boolean isStream = (plugin instanceof SparkStreamingSource)
+ || (plugin instanceof SparkStreamingSink);
+ if (!isStream) {
+ throw new IllegalArgumentException(
+ String.format("Current execute mode is Streaming,
but %s is not Streaming plugin", plugin.getPluginName()));
+ }
+ });
+ break;
+ case BATCH:
+ pluginStream.forEach(plugin -> {
+ boolean isBatch = (plugin instanceof SparkBatchSource)
+ || (plugin instanceof SparkBatchSink);
+ if (!isBatch) {
+ throw new IllegalArgumentException(
+ String.format("Current execute mode is Batch, but
%s is not Batch plugin", plugin.getPluginName()));
+ }
+ });
+ break;
+ case STRUCTURED_STREAMING:
+ pluginStream.forEach(plugin -> {
+ boolean isStructuredStreaming = (plugin instanceof
StructuredStreamingSource)
+ || (plugin instanceof StructuredStreamingSink);
+ if (!isStructuredStreaming) {
+ throw new IllegalArgumentException(
+ String.format("Current execute mode is
StructuredStreaming, but %s is not StructuredStreaming plugin",
plugin.getPluginName()));
+ }
+ });
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported job mode: " +
jobMode);
+ }
+ }
+
}
diff --git a/seatunnel-dist/release-docs/LICENSE
b/seatunnel-dist/release-docs/LICENSE
index 20b81ba0..ee7794aa 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -759,9 +759,6 @@ The text of each license is the standard Apache 2.0 license.
(The Apache Software License, Version 2.0) Google HTTP Client Library for
Java (com.google.http-client:google-http-client:1.26.0 -
https://github.com/googleapis/google-http-java-client/google-http-client)
(The Apache Software License, Version 2.0) Google OAuth Client Library
for Java (com.google.oauth-client:google-oauth-client:1.26.0 -
https://github.com/googleapis/google-oauth-java-client/google-oauth-client)
(The Apache Software License, Version 2.0) Gson
(com.google.code.gson:gson:2.2.4 - http://code.google.com/p/google-gson/)
- (The Apache Software License, Version 2.0) Guava: Google Core Libraries
for Java (com.google.guava:guava:11.0.2 -
http://code.google.com/p/guava-libraries/guava)
- (The Apache Software License, Version 2.0) Guava: Google Core Libraries
for Java (com.google.guava:guava:13.0.1 -
http://code.google.com/p/guava-libraries/guava)
- (The Apache Software License, Version 2.0) Guava: Google Core Libraries
for Java (com.google.guava:guava:16.0.1 -
http://code.google.com/p/guava-libraries/guava)
(The Apache Software License, Version 2.0) Guava: Google Core Libraries
for Java (com.google.guava:guava:19.0 - https://github.com/google/guava/guava)
(The Apache Software License, Version 2.0) HPPC Collections
(com.carrotsearch:hppc:0.7.1 - http://labs.carrotsearch.com/hppc.html/hppc)
(The Apache Software License, Version 2.0) HPPC Collections
(com.carrotsearch:hppc:0.7.2 - http://labs.carrotsearch.com/hppc.html/hppc)
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index e19ddf2e..c0f1f84e 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -192,8 +192,6 @@ google-http-client-1.26.0.jar
google-http-client-jackson2-1.26.0.jar
google-oauth-client-1.26.0.jar
gson-2.2.4.jar
-guava-13.0.1.jar
-guava-16.0.1.jar
guava-19.0.jar
guice-3.0.jar
guice-4.1.0.jar