This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ddabbab [Chore] Modify some wrong or non-standard code (#1562)
ddabbab is described below
commit ddabbab59a1b8b6260152276bb39db4fd31b8cef
Author: Kirs <[email protected]>
AuthorDate: Fri Mar 25 16:44:32 2022 +0800
[Chore] Modify some wrong or non-standard code (#1562)
---
.../apache/seatunnel/common/config/CheckConfigUtil.java | 2 +-
.../java/org/apache/seatunnel/flink/sink/ConsoleSink.java | 5 +++--
.../apache/seatunnel/flink/source/DruidInputFormat.java | 6 +++---
.../java/org/apache/seatunnel/flink/sink/FileSink.java | 6 +++---
.../org/apache/seatunnel/flink/source/InfluxDbSource.java | 2 +-
.../apache/seatunnel/command/BaseTaskExecuteCommand.java | 2 +-
.../java/org/apache/seatunnel/utils/AsciiArtUtils.java | 6 +++++-
.../java/org/apache/seatunnel/utils/CompressionUtils.java | 14 +++++++-------
8 files changed, 24 insertions(+), 19 deletions(-)
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/CheckConfigUtil.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/CheckConfigUtil.java
index 815d8d0..632414f 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/CheckConfigUtil.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/CheckConfigUtil.java
@@ -42,7 +42,7 @@ public final class CheckConfigUtil {
.filter(param -> !isValidParam(config, param))
.collect(Collectors.toList());
- if (missingParams.size() > 0) {
+ if (!missingParams.isEmpty()) {
String errorMsg = String.format("please specify [%s] as non-empty",
String.join(",", missingParams));
return CheckResult.error(errorMsg);
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
index bcf0872..0b79918 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
public class ConsoleSink extends RichOutputFormat<Row> implements
FlinkBatchSink, FlinkStreamSink {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConsoleSink.class);
+ private static final String LIMIT = "limit";
private Integer limit = Integer.MAX_VALUE;
private static final long serialVersionUID = 3482649370594181723L;
@@ -69,8 +70,8 @@ public class ConsoleSink extends RichOutputFormat<Row>
implements FlinkBatchSink
@Override
public CheckResult checkConfig() {
- if (config.hasPath("limit") && config.getInt("limit") >= -1) {
- limit = config.getInt("limit");
+ if (config.hasPath(LIMIT) && config.getInt(LIMIT) >= -1) {
+ limit = config.getInt(LIMIT);
}
return CheckResult.success();
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/source/DruidInputFormat.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/source/DruidInputFormat.java
index c185750..62408d1 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/source/DruidInputFormat.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/source/DruidInputFormat.java
@@ -91,7 +91,7 @@ public class DruidInputFormat extends RichInputFormat<Row,
InputSplit> implement
statement.close();
}
} catch (SQLException se) {
- LOG.error("DruidInputFormat Statement couldn't be closed - " +
se.getMessage());
+ LOG.error("DruidInputFormat Statement couldn't be closed", se);
} finally {
statement = null;
try {
@@ -99,7 +99,7 @@ public class DruidInputFormat extends RichInputFormat<Row,
InputSplit> implement
dbConn.close();
}
} catch (SQLException se) {
- LOG.error("DruidInputFormat couldn't be closed - " +
se.getMessage());
+ LOG.error("DruidInputFormat couldn't be closed", se);
} finally {
dbConn = null;
parameterValues = null;
@@ -162,7 +162,7 @@ public class DruidInputFormat extends RichInputFormat<Row,
InputSplit> implement
try {
resultSet.close();
} catch (SQLException se) {
- LOG.error("DruidInputFormat ResultSet couldn't be closed - " +
se.getMessage());
+ LOG.error("DruidInputFormat ResultSet couldn't be closed", se);
}
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
index 55dd971..ccda26c 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.flink.sink;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
-import org.apache.seatunnel.common.utils.StringTemplate;
+import org.apache.seatunnel.common.utils.VariablesSubstitute;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSink;
import org.apache.seatunnel.flink.enums.FormatType;
@@ -109,7 +109,7 @@ public class FileSink implements FlinkStreamSink,
FlinkBatchSink {
outputFormat = new CsvRowOutputFormat(filePath);
break;
case TEXT:
- outputFormat = new TextOutputFormat<Row>(filePath);
+ outputFormat = new TextOutputFormat<>(filePath);
break;
default:
LOGGER.warn(" unknown file_format [{}],only support
json,csv,text", format);
@@ -147,7 +147,7 @@ public class FileSink implements FlinkStreamSink,
FlinkBatchSink {
@Override
public void prepare(FlinkEnvironment env) {
String format = TypesafeConfigUtils.getConfig(config,
PATH_TIME_FORMAT, DEFAULT_TIME_FORMAT);
- String path = StringTemplate.substitute(config.getString(PATH),
format);
+ String path = VariablesSubstitute.substitute(config.getString(PATH),
format);
filePath = new Path(path);
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
index 019e7e9..812e5dc 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/source/InfluxDbSource.java
@@ -94,7 +94,7 @@ public class InfluxDbSource implements FlinkBatchSource {
@Override
public CheckResult checkConfig() {
- return CheckConfigUtil.check(config, SERVER_URL, DATABASE,
MEASUREMENT, FIELDS, FIELD_TYPES);
+ return CheckConfigUtil.checkAllExists(config, SERVER_URL, DATABASE,
MEASUREMENT, FIELDS, FIELD_TYPES);
}
@Override
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
index ba46c2c..0627821 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
@@ -85,7 +85,7 @@ public abstract class BaseTaskExecuteCommand<T extends
CommandArgs, E extends Ru
for (Plugin<E> plugin : pluginList) {
try (Plugin<?> closed = plugin) {
// ignore
- } catch (Throwable e) {
+ } catch (Exception e) {
RuntimeException wrapperException = new RuntimeException(
String.format("plugin %s closed error",
plugin.getClass()), e);
if (exceptionHolder == null) {
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/AsciiArtUtils.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/AsciiArtUtils.java
index 67305c0..780cd7e 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/AsciiArtUtils.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/AsciiArtUtils.java
@@ -57,7 +57,11 @@ public final class AsciiArtUtils {
for (int y = 0; y < height; y++) {
StringBuilder sb = new StringBuilder();
for (int x = 0; x < width; x++) {
- sb.append(image.getRGB(x, y) == RGB ? " " : image.getRGB(x, y)
== -1 ? "#" : "*");
+ if (image.getRGB(x, y) == RGB) {
+ sb.append(" ");
+ } else {
+ sb.append(image.getRGB(x, y) == -1 ? "#" : "*");
+ }
}
if (sb.toString().trim().isEmpty()) {
continue;
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/CompressionUtils.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/CompressionUtils.java
index 09fe2b3..bab7192 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/CompressionUtils.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/CompressionUtils.java
@@ -55,9 +55,9 @@ public final class CompressionUtils {
* @throws FileNotFoundException file not found exception
* @throws ArchiveException archive exception
*/
- public static void unTar(final File inputFile, final File outputDir)
throws FileNotFoundException, IOException, ArchiveException {
+ public static void unTar(final File inputFile, final File outputDir)
throws IOException, ArchiveException {
- LOGGER.info(String.format("Untaring %s to dir %s.",
inputFile.getAbsolutePath(), outputDir.getAbsolutePath()));
+ LOGGER.info("Untaring {} to dir {}.", inputFile.getAbsolutePath(),
outputDir.getAbsolutePath());
final List<File> untaredFiles = new LinkedList<>();
try (final InputStream is = new FileInputStream(inputFile);
@@ -66,15 +66,15 @@ public final class CompressionUtils {
while ((entry = (TarArchiveEntry) debInputStream.getNextEntry())
!= null) {
final File outputFile = new File(outputDir, entry.getName());
if (entry.isDirectory()) {
- LOGGER.info(String.format("Attempting to write output
directory %s.", outputFile.getAbsolutePath()));
+ LOGGER.info("Attempting to write output directory {}.",
outputFile.getAbsolutePath());
if (!outputFile.exists()) {
- LOGGER.info(String.format("Attempting to create output
directory %s.", outputFile.getAbsolutePath()));
+ LOGGER.info("Attempting to create output directory
{}.", outputFile.getAbsolutePath());
if (!outputFile.mkdirs()) {
throw new
IllegalStateException(String.format("Couldn't create directory %s.",
outputFile.getAbsolutePath()));
}
}
} else {
- LOGGER.info(String.format("Creating output file %s.",
outputFile.getAbsolutePath()));
+ LOGGER.info("Creating output file {}.",
outputFile.getAbsolutePath());
final OutputStream outputFileStream = new
FileOutputStream(outputFile);
IOUtils.copy(debInputStream, outputFileStream);
outputFileStream.close();
@@ -96,9 +96,9 @@ public final class CompressionUtils {
* @throws IOException io exception
* @throws FileNotFoundException file not found exception
*/
- public static File unGzip(final File inputFile, final File outputDir)
throws FileNotFoundException, IOException {
+ public static File unGzip(final File inputFile, final File outputDir)
throws IOException {
- LOGGER.info(String.format("Ungzipping %s to dir %s.",
inputFile.getAbsolutePath(), outputDir.getAbsolutePath()));
+ LOGGER.info("Unzipping {} to dir {}.", inputFile.getAbsolutePath(),
outputDir.getAbsolutePath());
final File outputFile = new File(outputDir,
inputFile.getName().substring(0, inputFile.getName().length() - 3));