ruanwenjun commented on issue #2212:
URL: 
https://github.com/apache/incubator-seatunnel/issues/2212#issuecomment-1189160233

   I change the submit jar logic in SparkStarter, and now the jar can submit to 
spark by --jars
   ```
   Execute SeaTunnel Spark Job: ${SPARK_HOME}/bin/spark-submit --class 
"org.apache.seatunnel.core.starter.spark.SeatunnelSpark" --name "SeaTunnel" 
--master "local" --deploy-mode "client" --jars 
"/tmp/spark/seatunnel/connectors/seatunnel/connector-fake-2.1.3-SNAPSHOT.jar,/tmp/spark/seatunnel/connectors/seatunnel/connector-console-2.1.3-SNAPSHOT.jar"
 --conf "spark.executor.memory=1g" --conf "spark.master=local" --conf 
"job.mode=BATCH" --conf "spark.executor.cores=1" --conf 
"spark.app.name=SeaTunnel" --conf "spark.executor.instances=2" 
/tmp/spark/seatunnel/lib/seatunnel-spark-starter.jar --master local 
--deploy-mode client --config /tmp/fake/fakesource_to_console.conf
   ```
   But still get error
   ```java
   t(SparkSubmit.scala:924)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: java.lang.ClassNotFoundException: 
org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:686)
        at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
        at 
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:223)
        ... 26 more
   ```
   This is caused by we use find the plugin jar at path, and we use 
`URLClassloader` to load the class, so we cannot deserialize the plugin class, 
since the plugin class is only load by the `URLClassloader`, this can be fixed 
by #2193.
   
   BTW, we need to change the `getConnectorJarDependencies` in SparkStarter, 
since we put the new connector understand `seatunnel` so we need to find it 
from `seatunnel` and use `SeaTunnelSourcePluginDiscovery` rather than 
`SparkSourcePluginDiscovery`.
   ```java
       private List<Path> getConnectorJarDependencies() {
           Path pluginRootDir = Common.connectorJarDir("seatunnel");
           LOGGER.info("Connector plugin dir is: {}", pluginRootDir);
           if (!Files.exists(pluginRootDir) || 
!Files.isDirectory(pluginRootDir)) {
               LOGGER.warn("Cannot find connector plugin from {}", 
pluginRootDir);
               return Collections.emptyList();
           }
           Config config = new 
ConfigBuilder(Paths.get(commandArgs.getConfigFile())).getConfig();
           Set<URL> pluginJars = new HashSet<>();
           SeaTunnelSourcePluginDiscovery sparkSourcePluginDiscovery = new 
SeaTunnelSourcePluginDiscovery();
           SeaTunnelSinkPluginDiscovery sparkSinkPluginDiscovery = new 
SeaTunnelSinkPluginDiscovery();
           
pluginJars.addAll(sparkSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(config,
 PluginType.SOURCE)));
           
pluginJars.addAll(sparkSinkPluginDiscovery.getPluginJarPaths(getPluginIdentifiers(config,
 PluginType.SINK)));
           return pluginJars.stream().map(url -> new 
File(url.getPath()).toPath()).collect(Collectors.toList());
       }
   
      private List<PluginIdentifier> getPluginIdentifiers(Config config, 
PluginType... pluginTypes) {
           return Arrays.stream(pluginTypes).flatMap((Function<PluginType, 
Stream<PluginIdentifier>>) pluginType -> {
               List<? extends Config> configList = 
config.getConfigList(pluginType.getType());
               return configList.stream()
                       .map(pluginConfig -> PluginIdentifier
                               .of("seatunnel",
                                       pluginType.getType(),
                                       pluginConfig.getString("plugin_name")));
           }).collect(Collectors.toList());
       }
   ```
   @zhangyuge1 Could you please help to fix this. cc @CalvinKirs @Hisoka-X .
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to