GangLiCN opened a new issue, #6386:
URL: https://github.com/apache/seatunnel/issues/6386

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   The full command to submit flink or spark job is not cross-platform 
compatible, for example, it can not run successfully on Windows.
   
   Reproduce steps:
   1)Submit a spark job:
   java 
-Dlog4j2.configurationFile=g:\apache\apache-seatunnel-2.3.3\config\log4j2_spark.properties
 -Dseatunnel.logs.path=g:\apache\apache-seatunnel-2.3.3\logs 
-Dseatunnel.logs.file_name=seatunnel-spark-starter -cp 
g:\apache\apache-seatunnel-2.3.3/lib/* 
org.apache.seatunnel.core.starter.spark.SparkStarter --config 
config/v2.batch.config.template -m local -e client
   
   or submit a flink job
   java 
-Dlog4j2.configurationFile=g:\apache/apache-seatunnel-2.3.3/config/log4j2_flink.properties
 -Dseatunnel.logs.path=g:\apache/apache-seatunnel-2.3.3/logs 
-Dseatunnel.logs.file_name=seatunnel-flink-fake-job -cp 
g:\apache/apache-seatunnel-2.3.3/lib/* 
org.apache.seatunnel.core.starter.flink.FlinkStarter --config 
../config/fake.flink.job.yaml
   
   
   2) Above commands will be converted to below spark/flink specific job 
running command:
   Flink:
   ${FLINK_HOME}/bin/flink run -c 
org.apache.seatunnel.core.starter.flink.SeaTunnelFlink 
G:\Apache\apache-seatunnel-2.3.3\starter\seatunnel-flink-15-starter.jar 
--config ../config/fake.flink.job.yaml --name SeaTunnel
   
   Spark:
   ${SPARK_HOME}/bin/spark-submit --class 
"org.apache.seatunnel.core.starter.spark.SeaTunnelSpark" --name "SeaTunnel" 
--master "local" --deploy-mode "client" --jars 
"G:\Apache\apache-seatunnel-2.3.3\lib\connector-cdc-mysql-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\connector-clickhouse-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\connector-fake-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\connector-jdbc-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\connector-kafka-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\jcl-over-slf4j-1.7.25.jar,G:\Apache\apache-seatunnel-2.3.3\lib\log4j-api-2.17.1.jar,G:\Apache\apache-seatunnel-2.3.3\lib\log4j-core-2.17.1.jar,G:\Apache\apache-seatunnel-2.3.3\lib\log4j-slf4j-impl-2.17.1.jar,G:\Apache\apache-seatunnel-2.3.3\lib\mysql-connector-j-8.0.33.jar,G:\Apache\apache-seatunnel-2.3.3\lib\postgresql-42.2.16.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-api-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-flink-15-starter.jar,G:\Apache\
 
apache-seatunnel-2.3.3\lib\seatunnel-hadoop3-3.1.4-uber-2.3.3-optional.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-spark-3-starter.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-starter.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-transforms-v2.jar,G:\Apache\apache-seatunnel-2.3.3\lib\slf4j-api-1.7.25.jar,G:\Apache\apache-seatunnel-2.3.3\connectors\seatunnel\connector-console-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\connectors\seatunnel\connector-fake-2.3.3.jar"
 --conf "job.mode=BATCH" --conf "execution.parallelism=2" --conf 
"checkpoint.interval=10000" 
G:\Apache\apache-seatunnel-2.3.3\starter\seatunnel-spark-3-starter.jar --config 
"config/v2.batch.config.template" --master "local" --deploy-mode "client" 
--name "SeaTunnel"
   
   
   How to fix:
   The root cause of this issue is that the whole command string used to submit 
flink or spark job is incorrect.
   For example, in "SparkStarter.java"(line 196-221),  the problematic line is:
           commands.add("${SPARK_HOME}/bin/spark-submit");
   
   full conde snippet:
   `protected List<String> buildFinal() {
           List<String> commands = new ArrayList<>();
           commands.add("${SPARK_HOME}/bin/spark-submit");
           appendOption(commands, "--class", SeaTunnelSpark.class.getName());
           appendOption(commands, "--name", this.commandArgs.getJobName());
           appendOption(commands, "--master", this.commandArgs.getMaster());
           appendOption(commands, "--deploy-mode", 
this.commandArgs.getDeployMode().getDeployMode());
           appendJars(commands, this.jars);
           appendFiles(commands, this.files);
           appendSparkConf(commands, this.sparkConf);
           appendAppJar(commands);
           appendOption(commands, "--config", this.commandArgs.getConfigFile());
           appendOption(commands, "--master", this.commandArgs.getMaster());
           appendOption(commands, "--deploy-mode", 
this.commandArgs.getDeployMode().getDeployMode());
           appendOption(commands, "--name", this.commandArgs.getJobName());
           if (commandArgs.isEncrypt()) {
               commands.add("--encrypt");
           }
           if (commandArgs.isDecrypt()) {
               commands.add("--decrypt");
           }
           if (this.commandArgs.isCheckConfig()) {
               commands.add("--check");
           }
           return commands;
       }
   `
   
   Same issue exists in "FlinkStarter.java"(line 53 to 95), the problematic 
line is:
    command.add("${FLINK_HOME}/bin/flink");
   
   Full code snippet:
   `public List<String> buildCommands() {
           List<String> command = new ArrayList<>();
           // set start command
           command.add("${FLINK_HOME}/bin/flink");
           // set deploy mode, run or run-application
           command.add(flinkCommandArgs.getDeployMode().getDeployMode());
           // set submitted target master
           if (flinkCommandArgs.getMasterType() != null) {
               command.add("--target");
               command.add(flinkCommandArgs.getMasterType().getMaster());
           }
           // set flink original parameters
           command.addAll(flinkCommandArgs.getOriginalParameters());
           // set main class name
           command.add("-c");
           command.add(APP_NAME);
           // set main jar name
           command.add(appJar);
           // set config file path
           command.add("--config");
           command.add(flinkCommandArgs.getConfigFile());
           // set check config flag
           if (flinkCommandArgs.isCheckConfig()) {
               command.add("--check");
           }
           // set job name
           command.add("--name");
           command.add(flinkCommandArgs.getJobName());
           // set encryption
           if (flinkCommandArgs.isEncrypt()) {
               command.add("--encrypt");
           }
           // set decryption
           if (flinkCommandArgs.isDecrypt()) {
               command.add("--decrypt");
           }
           // set extra system properties
           flinkCommandArgs.getVariables().stream()
                   .filter(Objects::nonNull)
                   .map(String::trim)
                   .forEach(variable -> command.add("-D" + variable));
           return command;
       }
   `
   
   ### SeaTunnel Version
   
   all released versions(e.g. 2.3.4, 2.3.3 or earlier versions)
   
   ### SeaTunnel Config
   
   ```conf
   env {
     # You can set SeaTunnel environment configuration here
     parallelism = 2
     job.mode = "BATCH"
     checkpoint.interval = 10000
   }
   
   source {
     # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
     FakeSource {
       parallelism = 2
       result_table_name = "fake"
       row.num = 16
       schema = {
         fields {
           name = "string"
           age = "int"
         }
       }
     }
   
     # If you would like to get more information about how to configure 
SeaTunnel and see full list of source plugins,
     # please go to https://seatunnel.apache.org/docs/category/source-v2
   }
   
   sink {
     Console {
     }
   
     # If you would like to get more information about how to configure 
SeaTunnel and see full list of sink plugins,
     # please go to https://seatunnel.apache.org/docs/category/sink-v2
   }
   ```
   
   
   ### Running Command
   
   ```shell
   Flink:
   ${FLINK_HOME}/bin/flink run -c 
org.apache.seatunnel.core.starter.flink.SeaTunnelFlink 
G:\Apache\apache-seatunnel-2.3.3\starter\seatunnel-flink-15-starter.jar 
--config ../config/fake.flink.job.yaml --name SeaTunnel
   
   Spark:
   ${SPARK_HOME}/bin/spark-submit --class 
"org.apache.seatunnel.core.starter.spark.SeaTunnelSpark" --name "SeaTunnel" 
--master "local" --deploy-mode "client" --jars 
"G:\Apache\apache-seatunnel-2.3.3\lib\connector-cdc-mysql-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\connector-clickhouse-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\connector-fake-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\connector-jdbc-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\connector-kafka-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\jcl-over-slf4j-1.7.25.jar,G:\Apache\apache-seatunnel-2.3.3\lib\log4j-api-2.17.1.jar,G:\Apache\apache-seatunnel-2.3.3\lib\log4j-core-2.17.1.jar,G:\Apache\apache-seatunnel-2.3.3\lib\log4j-slf4j-impl-2.17.1.jar,G:\Apache\apache-seatunnel-2.3.3\lib\mysql-connector-j-8.0.33.jar,G:\Apache\apache-seatunnel-2.3.3\lib\postgresql-42.2.16.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-api-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-flink-15-starter.jar,G:\Apache\
 
apache-seatunnel-2.3.3\lib\seatunnel-hadoop3-3.1.4-uber-2.3.3-optional.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-spark-3-starter.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-starter.jar,G:\Apache\apache-seatunnel-2.3.3\lib\seatunnel-transforms-v2.jar,G:\Apache\apache-seatunnel-2.3.3\lib\slf4j-api-1.7.25.jar,G:\Apache\apache-seatunnel-2.3.3\connectors\seatunnel\connector-console-2.3.3.jar,G:\Apache\apache-seatunnel-2.3.3\connectors\seatunnel\connector-fake-2.3.3.jar"
 --conf "job.mode=BATCH" --conf "execution.parallelism=2" --conf 
"checkpoint.interval=10000" 
G:\Apache\apache-seatunnel-2.3.3\starter\seatunnel-spark-3-starter.jar --config 
"config/v2.batch.config.template" --master "local" --deploy-mode "client" 
--name "SeaTunnel"
   ```
   
   
   ### Error Exception
   
   ```log
   The above command can not run on Windows since ${FLINK_HOME} or 
${SPARK_HOME} is not recognized on Windows, Instead, %FLINK_HOME%,%SPARK_HOME% 
is acceptable on windows.
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   Spark: 3.X
   Flink: 1.15 or higher version
   
   ### Java or Scala Version
   
   java version "1.8.0_401"
   Java(TM) SE Runtime Environment (build 1.8.0_401-b10)
   Java HotSpot(TM) 64-Bit Server VM (build 25.401-b10, mixed mode)
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to