This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1d445dc [Improve]Some code block optimizations (#1600)
1d445dc is described below
commit 1d445dca906aaa20a01757bb0179ff8f362a975b
Author: Kirs <[email protected]>
AuthorDate: Mon Mar 28 22:04:43 2022 +0800
[Improve]Some code block optimizations (#1600)
* [Improve]Some code block optimizations
* fix code style
* revert spark job model
---
.../java/org/apache/seatunnel/flink/FlinkEnvironment.java | 2 +-
.../main/java/org/apache/seatunnel/common/Constants.java | 1 -
.../apache/seatunnel/command/BaseTaskExecuteCommand.java | 15 ++++-----------
.../java/org/apache/seatunnel/core/sql/job/Executor.java | 4 ++++
.../seatunnel/core/sql/splitter/SqlStatementSplitter.java | 4 ++++
5 files changed, 13 insertions(+), 13 deletions(-)
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
index 647240a..d1762be 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
@@ -77,7 +77,6 @@ public class FlinkEnvironment implements RuntimeEnv {
}
@Override
-
public FlinkEnvironment prepare() {
if (isStreaming()) {
createStreamEnvironment();
@@ -100,6 +99,7 @@ public class FlinkEnvironment implements RuntimeEnv {
return JobMode.STREAMING.equals(jobMode);
}
+ @Override
public FlinkEnvironment setJobMode(JobMode jobMode) {
this.jobMode = jobMode;
return this;
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
index c26c94a..f77cd01 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
@@ -20,7 +20,6 @@ package org.apache.seatunnel.common;
public final class Constants {
public static final String ROW_ROOT = "__root__";
public static final String ROW_TMP = "__tmp__";
- public static final String ROW_JSON = "__json__";
public static final String LOGO = "SeaTunnel";
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
index dc8b966..d94e870 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
@@ -27,12 +27,10 @@ import org.apache.seatunnel.plugin.PluginClosedException;
import org.apache.seatunnel.utils.AsciiArtUtils;
import org.apache.seatunnel.utils.CompressionUtils;
-import org.apache.commons.compress.archivers.ArchiveException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -139,24 +137,19 @@ public abstract class BaseTaskExecuteCommand<T extends
CommandArgs, E extends Ru
File workDir = new File(".");
for (File file : Objects.requireNonNull(workDir.listFiles())) {
- LOGGER.warn("\t list file: " + file.getAbsolutePath());
+ LOGGER.warn("\t list file: {} ", file.getAbsolutePath());
}
// decompress plugin dir
File compressedFile = new File("plugins.tar.gz");
try {
File tempFile = CompressionUtils.unGzip(compressedFile,
workDir);
- try {
- CompressionUtils.unTar(tempFile, workDir);
- LOGGER.info("succeeded to decompress plugins.tar.gz");
- } catch (ArchiveException e) {
- LOGGER.error("failed to decompress plugins.tar.gz", e);
- System.exit(-1);
- }
- } catch (IOException e) {
+ CompressionUtils.unTar(tempFile, workDir);
+ } catch (Exception e) {
LOGGER.error("failed to decompress plugins.tar.gz", e);
System.exit(-1);
}
+ LOGGER.info("succeeded to decompress plugins.tar.gz");
}
}
diff --git
a/seatunnel-core/seatunnel-core-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java
b/seatunnel-core/seatunnel-core-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java
index b261f82..c716d1e 100644
---
a/seatunnel-core/seatunnel-core-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java
+++
b/seatunnel-core/seatunnel-core-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java
@@ -33,6 +33,10 @@ import java.util.List;
public class Executor {
+ private Executor() {
+ throw new IllegalStateException("Utility class");
+ }
+
public static void runJob(JobInfo jobInfo) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings fsSettings =
EnvironmentSettings.newInstance().inStreamingMode().build();
diff --git
a/seatunnel-core/seatunnel-core-sql/src/main/java/org/apache/seatunnel/core/sql/splitter/SqlStatementSplitter.java
b/seatunnel-core/seatunnel-core-sql/src/main/java/org/apache/seatunnel/core/sql/splitter/SqlStatementSplitter.java
index 8982593..13029b1 100644
---
a/seatunnel-core/seatunnel-core-sql/src/main/java/org/apache/seatunnel/core/sql/splitter/SqlStatementSplitter.java
+++
b/seatunnel-core/seatunnel-core-sql/src/main/java/org/apache/seatunnel/core/sql/splitter/SqlStatementSplitter.java
@@ -26,6 +26,10 @@ import java.util.stream.Collectors;
*/
public class SqlStatementSplitter {
+ private SqlStatementSplitter() {
+ throw new IllegalStateException("Utility class");
+ }
+
private static final String COMMENT_MASK = "--.*$";
private static final String BEGINNING_COMMENT_MASK = "^(\\s)*--.*$";
private static final String SEMICOLON = ";";