This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 5dae9c1ef [API-DRAFT] [MERGE] Fix obvious bugs that don't work 
properly before merge (#2082)
5dae9c1ef is described below

commit 5dae9c1ef0e1437e27b5e55b74f9b713f414449b
Author: Hisoka <[email protected]>
AuthorDate: Wed Jun 29 17:02:23 2022 +0800

    [API-DRAFT] [MERGE] Fix obvious bugs that don't work properly before merge 
(#2082)
---
 config/flink.batch.conf.template                   | 11 ++++-----
 config/spark.streaming.conf.template               |  2 +-
 plugin-mapping.properties                          |  1 +
 pom.xml                                            |  6 ++---
 .../core/base/config/AbstractExecutionContext.java | 17 ++++++--------
 .../apache/seatunnel/core/spark/SparkStarter.java  | 27 +++++++++++++++++++---
 seatunnel-dist/src/main/assembly/assembly-bin.xml  |  1 +
 7 files changed, 42 insertions(+), 23 deletions(-)

diff --git a/config/flink.batch.conf.template b/config/flink.batch.conf.template
index 45b3e8532..65f614df0 100644
--- a/config/flink.batch.conf.template
+++ b/config/flink.batch.conf.template
@@ -26,12 +26,11 @@ env {
 
 source {
   # This is a example input plugin **only for test and demonstrate the feature 
input plugin**
-  FileSource {
-    path = "hdfs://localhost:9000/output/text"
-    format.type = "text"
-    schema = "string"
-    result_table_name = "test"
-  }
+    FakeSource {
+      result_table_name = "test"
+      field_name = "name,age"
+    }
+
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of input plugins,
   # please go to 
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
diff --git a/config/spark.streaming.conf.template 
b/config/spark.streaming.conf.template
index 3c87f56e8..b79a9dec6 100644
--- a/config/spark.streaming.conf.template
+++ b/config/spark.streaming.conf.template
@@ -31,7 +31,7 @@ env {
 
 source {
   # This is a example input plugin **only for test and demonstrate the feature 
input plugin**
-  fakeStream {
+  FakeStream {
     content = ["Hello World, SeaTunnel"]
   }
 
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 01bb3abc4..5ba2e8121 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -45,6 +45,7 @@ flink.sink.Kafka = seatunnel-connector-flink-kafka
 
 spark.source.ElasticSearch = seatunnel-connector-spark-elasticsearch
 spark.source.Fake = seatunnel-connector-spark-fake
+spark.source.FakeStream = seatunnel-connector-spark-fake
 spark.source.FeishuSheet = seatunnel-connector-spark-feishu
 spark.source.File = seatunnel-connector-spark-file
 spark.source.Hbase = seatunnel-connector-spark-hbase
diff --git a/pom.xml b/pom.xml
index 9d21a4343..d60f43735 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,15 +82,15 @@
         <module>seatunnel-core</module>
         <module>seatunnel-transforms</module>
         <module>seatunnel-connectors</module>
-        <module>seatunnel-dist</module>
+        <module>seatunnel-connectors-v2</module>
+        <module>seatunnel-connectors-v2-dist</module>
         <module>seatunnel-examples</module>
         <module>seatunnel-e2e</module>
         <module>seatunnel-api</module>
         <module>seatunnel-translation</module>
         <module>seatunnel-plugin-discovery</module>
         <module>seatunnel-formats</module>
-        <module>seatunnel-connectors-v2</module>
-        <module>seatunnel-connectors-v2-dist</module>
+        <module>seatunnel-dist</module>
     </modules>
 
     <properties>
diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
index e3ed3d72e..92d4232c3 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
+++ 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
@@ -80,16 +80,13 @@ public abstract class AbstractExecutionContext<ENVIRONMENT 
extends RuntimeEnv> {
 
     @SuppressWarnings("checkstyle:Indentation")
     protected List<PluginIdentifier> getPluginIdentifiers(PluginType... 
pluginTypes) {
-        return Arrays.stream(pluginTypes).flatMap(new Function<PluginType, 
Stream<PluginIdentifier>>() {
-            @Override
-            public Stream<PluginIdentifier> apply(PluginType pluginType) {
-                List<? extends Config> configList = 
config.getConfigList(pluginType.getType());
-                return configList.stream()
-                    .map(pluginConfig -> PluginIdentifier
-                        .of(engine.getEngine(),
-                            pluginType.getType(),
-                            pluginConfig.getString("plugin_name")));
-            }
+        return Arrays.stream(pluginTypes).flatMap((Function<PluginType, 
Stream<PluginIdentifier>>) pluginType -> {
+            List<? extends Config> configList = 
config.getConfigList(pluginType.getType());
+            return configList.stream()
+                .map(pluginConfig -> PluginIdentifier
+                    .of(engine.getEngine(),
+                        pluginType.getType(),
+                        pluginConfig.getString("plugin_name")));
         }).collect(Collectors.toList());
     }
 }
diff --git 
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
 
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
index 02678f768..44c445403 100644
--- 
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
+++ 
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
@@ -22,12 +22,15 @@ import static java.nio.file.FileVisitOption.FOLLOW_LINKS;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.core.base.Starter;
 import org.apache.seatunnel.core.base.config.ConfigBuilder;
 import org.apache.seatunnel.core.base.config.EngineType;
 import org.apache.seatunnel.core.base.utils.CompressionUtils;
 import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.core.spark.config.SparkExecutionContext;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.spark.SparkSinkPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.spark.SparkSourcePluginDiscovery;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
@@ -40,6 +43,7 @@ import org.apache.commons.lang3.StringUtils;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -50,6 +54,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -220,8 +225,12 @@ public class SparkStarter implements Starter {
             return Collections.emptyList();
         }
         Config config = new 
ConfigBuilder(Paths.get(commandArgs.getConfigFile())).getConfig();
-        SparkExecutionContext sparkExecutionContext = new 
SparkExecutionContext(config, EngineType.SPARK);
-        return sparkExecutionContext.getPluginJars().stream().map(url -> new 
File(url.getPath()).toPath()).collect(Collectors.toList());
+        List<URL> pluginJars = new ArrayList<>();
+        SparkSourcePluginDiscovery sparkSourcePluginDiscovery = new 
SparkSourcePluginDiscovery();
+        SparkSinkPluginDiscovery sparkSinkPluginDiscovery = new 
SparkSinkPluginDiscovery();
+        
pluginJars.addAll(sparkSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(config,
 PluginType.SOURCE)));
+        
pluginJars.addAll(sparkSinkPluginDiscovery.getPluginJarPaths(getPluginIdentifiers(config,
 PluginType.SINK)));
+        return pluginJars.stream().map(url -> new 
File(url.getPath()).toPath()).collect(Collectors.toList());
     }
 
     /**
@@ -313,6 +322,18 @@ public class SparkStarter implements Starter {
         
commands.add(Common.appLibDir().resolve("seatunnel-core-spark.jar").toString());
     }
 
+    @SuppressWarnings("checkstyle:Indentation")
+    private List<PluginIdentifier> getPluginIdentifiers(Config config, 
PluginType... pluginTypes) {
+        return Arrays.stream(pluginTypes).flatMap((Function<PluginType, 
Stream<PluginIdentifier>>) pluginType -> {
+            List<? extends Config> configList = 
config.getConfigList(pluginType.getType());
+            return configList.stream()
+                    .map(pluginConfig -> PluginIdentifier
+                            .of(EngineType.SPARK.getEngine(),
+                                    pluginType.getType(),
+                                    pluginConfig.getString("plugin_name")));
+        }).collect(Collectors.toList());
+    }
+
     /**
      * a Starter for building spark-submit commands with client mode options
      */
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin.xml 
b/seatunnel-dist/src/main/assembly/assembly-bin.xml
index 258d8b916..afd0c7150 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin.xml
@@ -146,6 +146,7 @@
             </includes>
             <excludes>
                 <exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
+                <exclude>connector-common*.jar</exclude>
             </excludes>
             <outputDirectory>/connectors/seatunnel</outputDirectory>
         </fileSet>

Reply via email to