This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 49b2190 [SEATUNNEL#1060] Enable the MagicNumber rule in checkstyle
and fix them (#1061)
49b2190 is described below
commit 49b219003e9f6bab0dea31c93402c104d45402b4
Author: Benedict Jin <[email protected]>
AuthorDate: Mon Jan 17 13:02:50 2022 +0800
[SEATUNNEL#1060] Enable the MagicNumber rule in checkstyle and fix them
(#1061)
* [SEATUNNEL#1060] Enable the MagicNumber rule in checkstyle and fix them
* Add DEFAULT_INITIAL_CAPACITY
* Using 16 instead 1<<4 to pass CI
* Address comment
---
.../org/apache/seatunnel/flink/sink/DorisOutputFormat.java | 3 ++-
.../main/java/org/apache/seatunnel/flink/sink/DorisSink.java | 8 ++++++--
.../java/org/apache/seatunnel/flink/sink/DorisStreamLoad.java | 5 +++--
.../java/org/apache/seatunnel/flink/sink/Elasticsearch.java | 4 ++--
.../org/apache/seatunnel/flink/source/FakeSourceStream.java | 6 ++++--
.../org/apache/seatunnel/flink/sink/CsvRowOutputFormat.java | 6 +++---
.../java/org/apache/seatunnel/flink/source/FileSource.java | 4 +++-
.../main/java/org/apache/seatunnel/flink/sink/JdbcSink.java | 4 +++-
.../org/apache/seatunnel/flink/source/KafkaTableStream.java | 3 ++-
.../java/org/apache/seatunnel/flink/source/SocketStream.java | 4 ++--
.../main/java/org/apache/seatunnel/utils/AsciiArtUtils.java | 11 ++++++++---
tools/checkstyle/checkStyle.xml | 2 ++
12 files changed, 40 insertions(+), 20 deletions(-)
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisOutputFormat.java
b/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisOutputFormat.java
index d0b6f4a..6965285 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisOutputFormat.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisOutputFormat.java
@@ -44,6 +44,7 @@ import java.util.regex.Pattern;
**/
public class DorisOutputFormat<T> extends RichOutputFormat<T> {
private static final Logger LOGGER =
LoggerFactory.getLogger(DorisSinkFunction.class);
+ private static final long DEFAULT_INTERVAL_MS =
TimeUnit.SECONDS.toMillis(1);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String FIELD_DELIMITER_KEY = "column_separator";
private static final String FIELD_DELIMITER_DEFAULT = "\t";
@@ -219,7 +220,7 @@ public class DorisOutputFormat<T> extends
RichOutputFormat<T> {
throw new IOException(e);
}
try {
- Thread.sleep(1000L * i);
+ Thread.sleep(DEFAULT_INTERVAL_MS * i);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException("unable to flush; interrupted while
doing another attempt", e);
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
b/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
index 4e55250..9e98901 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
@@ -35,17 +35,21 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
public class DorisSink implements FlinkStreamSink<Row, Row>,
FlinkBatchSink<Row, Row> {
+ private static final int DEFAULT_BATCH_SIZE = 100;
+ private static final long DEFAULT_INTERVAL_MS =
TimeUnit.SECONDS.toMillis(1);
+
private Config config;
private String fenodes;
private String username;
private String password;
private String tableName;
private String dbName;
- private int batchSize = 100;
- private long batchIntervalMs = 1000;
+ private int batchSize = DEFAULT_BATCH_SIZE;
+ private long batchIntervalMs = DEFAULT_INTERVAL_MS;
private int maxRetries = 1;
private Properties streamLoadProp = new Properties();
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisStreamLoad.java
b/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisStreamLoad.java
index d2fbe4d..e82f239 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisStreamLoad.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisStreamLoad.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.flink.sink;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,7 +84,7 @@ public class DorisStreamLoad implements Serializable {
public void load(String value) {
LoadResponse loadResponse = loadBatch(value);
LOGGER.info("Streamload Response:{}", loadResponse);
- if (loadResponse.status != 200) {
+ if (loadResponse.status != HttpResponseStatus.OK.code()) {
throw new RuntimeException("stream load error: " +
loadResponse.respContent);
} else {
try {
@@ -111,7 +112,7 @@ public class DorisStreamLoad implements Serializable {
feConn = getConnection(loadUrlStr, label);
int status = feConn.getResponseCode();
// fe send back http response code TEMPORARY_REDIRECT 307 and new
be location
- if (status != 307) {
+ if (status != HttpResponseStatus.TEMPORARY_REDIRECT.code()) {
throw new Exception("status is not TEMPORARY_REDIRECT 307,
status: " + status);
}
String location = feConn.getHeaderField("Location");
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/sink/Elasticsearch.java
b/seatunnel-connectors/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/sink/Elasticsearch.java
index e3b53a6..71a504d 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/sink/Elasticsearch.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/sink/Elasticsearch.java
@@ -93,8 +93,8 @@ public class Elasticsearch implements FlinkStreamSink<Row,
Row>, FlinkBatchSink<
httpHosts,
new ElasticsearchSinkFunction<Row>() {
public IndexRequest createIndexRequest(Row element) {
- Map<String, Object> json = new HashMap<>(100);
int elementLen = element.getArity();
+ Map<String, Object> json = new HashMap<>(elementLen);
for (int i = 0; i < elementLen; i++) {
json.put(fieldNames[i], element.getField(i));
}
@@ -132,8 +132,8 @@ public class Elasticsearch implements FlinkStreamSink<Row,
Row>, FlinkBatchSink<
}
private IndexRequest createIndexRequest(Row element) {
- Map<String, Object> json = new HashMap<>(100);
int elementLen = element.getArity();
+ Map<String, Object> json = new HashMap<>(elementLen);
for (int i = 0; i < elementLen; i++) {
json.put(fieldNames[i], element.getField(i));
}
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSourceStream.java
b/seatunnel-connectors/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSourceStream.java
index e3dfcf8..a0f0cb3 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSourceStream.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSourceStream.java
@@ -28,6 +28,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.stream.FlinkStreamSource;
+import java.util.concurrent.TimeUnit;
+
import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
@@ -68,10 +70,10 @@ public class FakeSourceStream extends
RichParallelSourceFunction<Row> implements
@Override
public void run(SourceContext<Row> ctx) throws Exception {
while (running) {
- int randomNum = (int) (1 + Math.random() * 3);
+ int randomNum = (int) (1 + Math.random() * NAME_ARRAY.length);
Row row = Row.of(NAME_ARRAY[randomNum - 1],
System.currentTimeMillis());
ctx.collect(row);
- Thread.sleep(1000);
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1));
}
}
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/CsvRowOutputFormat.java
b/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/CsvRowOutputFormat.java
index 0086e97..622f725 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/CsvRowOutputFormat.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/CsvRowOutputFormat.java
@@ -30,8 +30,8 @@ import java.io.Writer;
public class CsvRowOutputFormat extends FileOutputFormat<Row> {
+ private static final int STREAM_BUFFER_SIZE = 4096;
public static final String DEFAULT_LINE_DELIMITER =
CsvInputFormat.DEFAULT_LINE_DELIMITER;
-
public static final String DEFAULT_FIELD_DELIMITER =
CsvInputFormat.DEFAULT_FIELD_DELIMITER;
private transient Writer wrt;
@@ -131,8 +131,8 @@ public class CsvRowOutputFormat extends
FileOutputFormat<Row> {
@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
- this.wrt = this.charsetName == null ? new OutputStreamWriter(new
BufferedOutputStream(this.stream, 4096)) :
- new OutputStreamWriter(new BufferedOutputStream(this.stream,
4096), this.charsetName);
+ this.wrt = this.charsetName == null ? new OutputStreamWriter(new
BufferedOutputStream(this.stream, STREAM_BUFFER_SIZE)) :
+ new OutputStreamWriter(new BufferedOutputStream(this.stream,
STREAM_BUFFER_SIZE), this.charsetName);
}
@Override
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/source/FileSource.java
b/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/source/FileSource.java
index 96a9822..d388283 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/source/FileSource.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/source/FileSource.java
@@ -43,6 +43,8 @@ import java.util.Map;
public class FileSource implements FlinkBatchSource<Row> {
+ private static final int DEFAULT_BATCH_SIZE = 1000;
+
private Config config;
private InputFormat inputFormat;
@@ -90,7 +92,7 @@ public class FileSource implements FlinkBatchSource<Row> {
inputFormat = new ParquetRowInputFormat(filePath, messageType);
break;
case "orc":
- OrcRowInputFormat orcRowInputFormat = new
OrcRowInputFormat(path, schemaContent, null, 1000);
+ OrcRowInputFormat orcRowInputFormat = new
OrcRowInputFormat(path, schemaContent, null, DEFAULT_BATCH_SIZE);
this.inputFormat = orcRowInputFormat;
break;
case "csv":
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
b/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
index 1912cc0..149b044 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/sink/JdbcSink.java
@@ -37,13 +37,15 @@ import org.apache.flink.types.Row;
public class JdbcSink implements FlinkStreamSink<Row, Row>,
FlinkBatchSink<Row, Row> {
+ private static final int DEFAULT_BATCH_SIZE = 5000;
+
private Config config;
private String driverName;
private String dbUrl;
private String username;
private String password;
private String query;
- private int batchSize = 5000;
+ private int batchSize = DEFAULT_BATCH_SIZE;
@Override
public void setConfig(Config config) {
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/source/KafkaTableStream.java
b/seatunnel-connectors/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/source/KafkaTableStream.java
index 15f110d..2ae1d51 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/source/KafkaTableStream.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/source/KafkaTableStream.java
@@ -62,6 +62,7 @@ public class KafkaTableStream implements
FlinkStreamSource<Row> {
private static final String GROUP_ID = "group.id";
private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
private static final String OFFSET_RESET = "offset.reset";
+ private static final int DEFAULT_INITIAL_CAPACITY = 16;
@Override
public void setConfig(Config config) {
@@ -142,7 +143,7 @@ public class KafkaTableStream implements
FlinkStreamSource<Row> {
break;
case "specific":
String offset = config.getString("offset.reset.specific");
- HashMap<Integer, Long> map = new HashMap<>(16);
+ HashMap<Integer, Long> map = new
HashMap<>(DEFAULT_INITIAL_CAPACITY);
JSONObject.parseObject(offset).forEach((k, v) ->
map.put(Integer.valueOf(k), Long.valueOf(v.toString())));
kafka.startFromSpecificOffsets(map);
break;
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/source/SocketStream.java
b/seatunnel-connectors/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/source/SocketStream.java
index 5513951..4d52d58 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/source/SocketStream.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/source/SocketStream.java
@@ -36,10 +36,10 @@ public class SocketStream implements FlinkStreamSource<Row>
{
private static final String HOST = "host";
private static final String PORT = "port";
+ private static final int DEFAULT_PORT = 9999;
private String host = "localhost";
-
- private int port = 9999;
+ private int port = DEFAULT_PORT;
@Override
public DataStream<Row> getData(FlinkEnvironment env) {
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/AsciiArtUtils.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/AsciiArtUtils.java
index c2d19c1..5bfebf0 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/AsciiArtUtils.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/AsciiArtUtils.java
@@ -27,8 +27,13 @@ import java.awt.RenderingHints;
import java.awt.image.BufferedImage;
public class AsciiArtUtils {
+
private static final Logger LOGGER =
LoggerFactory.getLogger(AsciiArtUtils.class);
+ private static final int FONT_SIZE = 24;
+ private static final int DRAW_X = 6;
+ private static final int RGB = -16777216;
+
/**
* Print ASCII art of string
*
@@ -40,16 +45,16 @@ public class AsciiArtUtils {
final int height = 32;
BufferedImage image = new BufferedImage(width, height,
BufferedImage.TYPE_INT_RGB);
Graphics g = image.getGraphics();
- g.setFont(new Font("Dialog", Font.PLAIN, 24));
+ g.setFont(new Font("Dialog", Font.PLAIN, FONT_SIZE));
Graphics2D graphics = (Graphics2D) g;
graphics.setRenderingHint(RenderingHints.KEY_TEXT_ANTIALIASING,
RenderingHints.VALUE_TEXT_ANTIALIAS_ON);
- graphics.drawString(str, 6, 24);
+ graphics.drawString(str, DRAW_X, FONT_SIZE);
for (int y = 0; y < height; y++) {
StringBuilder sb = new StringBuilder();
for (int x = 0; x < width; x++) {
- sb.append(image.getRGB(x, y) == -16777216 ? " " :
image.getRGB(x, y) == -1 ? "#" : "*");
+ sb.append(image.getRGB(x, y) == RGB ? " " : image.getRGB(x, y)
== -1 ? "#" : "*");
}
if (sb.toString().trim().isEmpty()) {
continue;
diff --git a/tools/checkstyle/checkStyle.xml b/tools/checkstyle/checkStyle.xml
index fcd3574..b44e1e6 100755
--- a/tools/checkstyle/checkStyle.xml
+++ b/tools/checkstyle/checkStyle.xml
@@ -195,6 +195,8 @@
<module name="OneStatementPerLine"/>
+ <module name="MagicNumber"/>
+
<module name="MultipleVariableDeclarations"/>
<module name="ArrayTypeStyle"/>