This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push:
new 989679b format(code): fix many coding vilations
989679b is described below
commit 989679b2b170af46cb61d2e0847e2b1af3e57451
Author: vongosling <[email protected]>
AuthorDate: Wed Aug 25 16:19:36 2021 +0800
format(code): fix many coding vilations
---
.gitignore | 33 ++++++++++++++++++++
.../rocketmq/flink/common/RocketMQOptions.java | 1 -
.../rocketmq/flink/legacy/RocketMQConfig.java | 2 +-
.../RowKeyValueDeserializationSchema.java | 29 +++++++++---------
.../SimpleTupleDeserializationSchema.java | 17 +++++++++++
.../flink/legacy/common/util/MetricUtils.java | 4 +--
.../BoundedOutOfOrdernessGeneratorPerQueue.java | 1 -
.../flink/legacy/example/SimpleConsumer.java | 10 +++----
.../flink/legacy/example/SimpleProducer.java | 10 +++----
.../reader/deserializer/DeserializationSchema.java | 18 +++++++++++
.../RocketMQDeserializationSchema.java | 18 +++++++++++
.../deserializer/RowDeserializationSchema.java | 35 +++++++++++-----------
.../flink/source/util/StringSerializer.java | 4 +--
style/rmq_checkstyle.xml | 8 +++--
14 files changed, 139 insertions(+), 51 deletions(-)
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..181e089
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,33 @@
+# Compiled class file
+*.class
+
+# Log file
+*.log
+
+# Package Files #
+*.jar
+*.war
+*.nar
+*.ear
+*.zip
+*.tar.gz
+*.rar
+
+# virtual machine crash logs, see
http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
+
+*.iml
+
+.idea/
+
+target/
+
+.DS_Store
+nohup.out
+
+# VSCode
+*.project
+*.settings/
+*.classpath
+*.factorypath
+.vscode/
diff --git
a/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
index b34826d..d03c76c 100644
--- a/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
+++ b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
@@ -80,7 +80,6 @@ public class RocketMQOptions {
public static final ConfigOption<Long> OPTIONAL_WRITE_SLEEP_TIME_MS =
ConfigOptions.key("sleepTimeMs").longType().defaultValue(5000L);
-
public static final ConfigOption<Boolean> OPTIONAL_WRITE_IS_DYNAMIC_TAG =
ConfigOptions.key("isDynamicTag").booleanType().defaultValue(false);
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
index fc257a1..e16f4f3 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
@@ -162,7 +162,7 @@ public class RocketMQConfig {
client.setHeartbeatBrokerInterval(
getInteger(props, BROKER_HEART_BEAT_INTERVAL,
DEFAULT_BROKER_HEART_BEAT_INTERVAL));
// When using aliyun products, you need to set up channels
- client.setAccessChannel((getAccessChannel(props, ACCESS_CHANNEL,
DEFAULT_ACCESS_CHANNEL)));
+ client.setAccessChannel(getAccessChannel(props, ACCESS_CHANNEL,
DEFAULT_ACCESS_CHANNEL));
client.setUnitName(props.getProperty(UNIT_NAME, null));
}
diff --git
a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
index bc43b1c..9ea5aa3 100644
---
a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
+++
b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
@@ -49,7 +49,7 @@ import java.util.Map;
public class RowKeyValueDeserializationSchema implements
KeyValueDeserializationSchema<RowData> {
private static final long serialVersionUID = -1L;
- private static final Logger logger =
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RowKeyValueDeserializationSchema.class);
private transient TableSchema tableSchema;
@@ -111,7 +111,7 @@ public class RowKeyValueDeserializationSchema implements
KeyValueDeserialization
return rowData;
} else {
if (value == null) {
- logger.info("Deserialize empty BytesMessage body, ignore the
empty message.");
+ LOGGER.info("Deserialize empty BytesMessage body, ignore the
empty message.");
return null;
}
return deserializeValue(value);
@@ -201,7 +201,7 @@ public class RowKeyValueDeserializationSchema implements
KeyValueDeserialization
case SKIP:
long now = System.currentTimeMillis();
if (columnErrorDebug || now - lastLogExceptionTime >
DEFAULT_LOG_INTERVAL_MS) {
- logger.warn(
+ LOGGER.warn(
"Data format error, field type: "
+ fieldTypes[index]
+ "field data: "
@@ -219,7 +219,6 @@ public class RowKeyValueDeserializationSchema implements
KeyValueDeserialization
case SKIP_SILENT:
skip = true;
break;
- default:
case CUT:
case NULL:
case PAD:
@@ -227,6 +226,7 @@ public class RowKeyValueDeserializationSchema implements
KeyValueDeserialization
break;
case EXCEPTION:
throw new RuntimeException(e);
+ default:
}
return skip;
@@ -238,23 +238,22 @@ public class RowKeyValueDeserializationSchema implements
KeyValueDeserialization
"Field missing exception, table column number: %d,
data column number: %d, data field number: %d, data: [%s].",
columnSize, columnSize, data.length,
StringUtils.join(data, ","));
switch (fieldMissingStrategy) {
- default:
case SKIP:
long now = System.currentTimeMillis();
if (columnErrorDebug || now - lastLogHandleFieldTime >
DEFAULT_LOG_INTERVAL_MS) {
- logger.warn(fieldMissingMessage);
+ LOGGER.warn(fieldMissingMessage);
lastLogHandleFieldTime = now;
}
return null;
+ case EXCEPTION:
+ LOGGER.error(fieldMissingMessage);
+ throw new RuntimeException(fieldMissingMessage);
case SKIP_SILENT:
- return null;
case CUT:
case NULL:
case PAD:
+ default:
return data;
- case EXCEPTION:
- logger.error(fieldMissingMessage);
- throw new RuntimeException(fieldMissingMessage);
}
}
@@ -267,20 +266,20 @@ public class RowKeyValueDeserializationSchema implements
KeyValueDeserialization
case SKIP:
long now = System.currentTimeMillis();
if (columnErrorDebug || now - lastLogHandleFieldTime >
DEFAULT_LOG_INTERVAL_MS) {
- logger.warn(fieldIncrementMessage);
+ LOGGER.warn(fieldIncrementMessage);
lastLogHandleFieldTime = now;
}
return null;
case SKIP_SILENT:
return null;
- default:
+ case EXCEPTION:
+ LOGGER.error(fieldIncrementMessage);
+ throw new RuntimeException(fieldIncrementMessage);
case CUT:
case NULL:
case PAD:
+ default:
return data;
- case EXCEPTION:
- logger.error(fieldIncrementMessage);
- throw new RuntimeException(fieldIncrementMessage);
}
}
diff --git
a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleTupleDeserializationSchema.java
b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleTupleDeserializationSchema.java
index 3bac266..440cb3e 100644
---
a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleTupleDeserializationSchema.java
+++
b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleTupleDeserializationSchema.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.rocketmq.flink.legacy.common.serialization;
import org.apache.flink.api.common.typeinfo.TypeHint;
diff --git
a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
index a54a29a..6c62cc4 100644
---
a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
+++
b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
@@ -32,7 +32,7 @@ public class MetricUtils {
private static final String METRICS_SINK_IN_TPS = "inTps";
private static final String METRICS_SINK_OUT_TPS = "outTps";
private static final String METRICS_SINK_OUT_BPS = "outBps";
- private static final String METRICS_SINK_OUT_Latency = "outLatency";
+ private static final String METRICS_SINK_OUT_LATENCY = "outLatency";
public static Meter registerSinkInTps(RuntimeContext context) {
Counter parserCounter =
@@ -67,7 +67,7 @@ public class MetricUtils {
public static LatencyGauge registerOutLatency(RuntimeContext context) {
return context.getMetricGroup()
.addGroup(METRIC_GROUP_SINK)
- .gauge(METRICS_SINK_OUT_Latency, new LatencyGauge());
+ .gauge(METRICS_SINK_OUT_LATENCY, new LatencyGauge());
}
public static class LatencyGauge implements Gauge<Double> {
diff --git
a/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/BoundedOutOfOrdernessGeneratorPerQueue.java
b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/BoundedOutOfOrdernessGeneratorPerQueue.java
index ab49131..ff11251 100644
---
a/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/BoundedOutOfOrdernessGeneratorPerQueue.java
+++
b/src/main/java/org/apache/rocketmq/flink/legacy/common/watermark/BoundedOutOfOrdernessGeneratorPerQueue.java
@@ -25,7 +25,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-/** 取每条队列中的最大eventTime的最小值作为当前source的watermark */
public class BoundedOutOfOrdernessGeneratorPerQueue
implements AssignerWithPeriodicWatermarks<MessageExt> {
diff --git
a/src/main/java/org/apache/rocketmq/flink/legacy/example/SimpleConsumer.java
b/src/main/java/org/apache/rocketmq/flink/legacy/example/SimpleConsumer.java
index 9e025bb..dfc6559 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/example/SimpleConsumer.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/example/SimpleConsumer.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
public class SimpleConsumer {
- private static final Logger log =
LoggerFactory.getLogger(SimpleConsumer.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SimpleConsumer.class);
// Consumer config
private static final String NAME_SERVER_ADDR =
@@ -44,9 +44,9 @@ public class SimpleConsumer {
private static final String TAGS = "*";
private static RPCHook getAclRPCHook() {
- final String ACCESS_KEY = "${AccessKey}";
- final String SECRET_KEY = "${SecretKey}";
- return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY,
SECRET_KEY));
+ final String accessKey = "${AccessKey}";
+ final String secretKey = "${SecretKey}";
+ return new AclClientRPCHook(new SessionCredentials(accessKey,
secretKey));
}
public static void main(String[] args) {
@@ -82,7 +82,7 @@ public class SimpleConsumer {
try {
consumer.start();
} catch (MQClientException e) {
- log.info("send message failed. {}", e.toString());
+ LOGGER.info("send message failed. {}", e.toString());
}
}
}
diff --git
a/src/main/java/org/apache/rocketmq/flink/legacy/example/SimpleProducer.java
b/src/main/java/org/apache/rocketmq/flink/legacy/example/SimpleProducer.java
index ea24f60..24e86ac 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/example/SimpleProducer.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/example/SimpleProducer.java
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
public class SimpleProducer {
- private static final Logger log =
LoggerFactory.getLogger(SimpleProducer.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SimpleProducer.class);
private static final int MESSAGE_NUM = 10000;
@@ -44,9 +44,9 @@ public class SimpleProducer {
private static final String KEY_PREFIX = "KEY";
private static RPCHook getAclRPCHook() {
- final String ACCESS_KEY = "${AccessKey}";
- final String SECRET_KEY = "${SecretKey}";
- return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY,
SECRET_KEY));
+ final String accessKey = "${AccessKey}";
+ final String secretKey = "${SecretKey}";
+ return new AclClientRPCHook(new SessionCredentials(accessKey,
secretKey));
}
public static void main(String[] args) {
@@ -74,7 +74,7 @@ public class SimpleProducer {
sendResult.getMsgId(),
sendResult.getMessageQueue().toString());
Thread.sleep(50);
} catch (Exception e) {
- log.info("send message failed. {}", e.toString());
+ LOGGER.info("send message failed. {}", e.toString());
}
}
}
diff --git
a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DeserializationSchema.java
b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DeserializationSchema.java
index 3b087cc..f0f47f3 100644
---
a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DeserializationSchema.java
+++
b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DeserializationSchema.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.rocketmq.flink.source.reader.deserializer;
import org.apache.flink.annotation.PublicEvolving;
diff --git
a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java
b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java
index 6358e4c..26612ea 100644
---
a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java
+++
b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.rocketmq.flink.source.reader.deserializer;
import org.apache.rocketmq.common.message.MessageExt;
diff --git
a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
index b946016..08a0b25 100644
---
a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
+++
b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
@@ -1,12 +1,13 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -56,7 +57,7 @@ public class RowDeserializationSchema
implements DeserializationSchema<List<BytesMessage>, RowData> {
private static final long serialVersionUID = -1L;
- private static final Logger logger =
LoggerFactory.getLogger(RowDeserializationSchema.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(RowDeserializationSchema.class);
private transient TableSchema tableSchema;
private final DirtyDataStrategy formatErrorStrategy;
@@ -179,7 +180,7 @@ public class RowDeserializationSchema
collector.collect(rowData);
} else {
if (message.getData() == null) {
- logger.info("Deserialize empty BytesMessage body, ignore
the empty message.");
+ LOG.info("Deserialize empty BytesMessage body, ignore the
empty message.");
return;
}
deserializeBytesMessage(message, collector);
@@ -286,7 +287,7 @@ public class RowDeserializationSchema
case SKIP:
long now = System.currentTimeMillis();
if (columnErrorDebug || now - lastLogExceptionTime >
DEFAULT_LOG_INTERVAL_MS) {
- logger.warn(
+ LOG.warn(
"Data format error, field type: "
+ fieldTypes[index]
+ "field data: "
@@ -304,7 +305,6 @@ public class RowDeserializationSchema
case SKIP_SILENT:
skip = true;
break;
- default:
case CUT:
case NULL:
case PAD:
@@ -312,6 +312,7 @@ public class RowDeserializationSchema
break;
case EXCEPTION:
throw new RuntimeException(e);
+ default:
}
return skip;
@@ -319,11 +320,10 @@ public class RowDeserializationSchema
private String[] handleFieldMissing(String[] data) {
switch (fieldMissingStrategy) {
- default:
case SKIP:
long now = System.currentTimeMillis();
if (columnErrorDebug || now - lastLogHandleFieldTime >
DEFAULT_LOG_INTERVAL_MS) {
- logger.warn(
+ LOG.warn(
"Field missing error, table column number: "
+ totalColumnSize
+ ", data column number: "
@@ -353,6 +353,8 @@ public class RowDeserializationSchema
}
case EXCEPTION:
throw new RuntimeException();
+ default:
+ return null;
}
}
@@ -361,7 +363,7 @@ public class RowDeserializationSchema
case SKIP:
long now = System.currentTimeMillis();
if (columnErrorDebug || now - lastLogHandleFieldTime >
DEFAULT_LOG_INTERVAL_MS) {
- logger.warn(
+ LOG.warn(
"Field increment error, table column number: "
+ totalColumnSize
+ ", data column number: "
@@ -374,9 +376,6 @@ public class RowDeserializationSchema
lastLogHandleFieldTime = now;
}
return null;
- case SKIP_SILENT:
- return null;
- default:
case CUT:
case NULL:
case PAD:
@@ -392,6 +391,8 @@ public class RowDeserializationSchema
}
case EXCEPTION:
throw new RuntimeException();
+ default:
+ return null;
}
}
diff --git
a/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java
b/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java
index e2bcf22..cdbc78a 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java
@@ -36,7 +36,7 @@ import java.util.Set;
public class StringSerializer {
public static TimestampConverter timestampConverter = new
TimestampConverter(3);
- private static final Base64.Decoder decoder = Base64.getDecoder();
+ private static final Base64.Decoder DECODER = Base64.getDecoder();
public static Object deserialize(
String value,
@@ -60,7 +60,7 @@ public class StringSerializer {
if (isRGData) {
byte[] bytes = null;
try {
- bytes = decoder.decode(value);
+ bytes = DECODER.decode(value);
} catch (Exception e) {
//
}
diff --git a/style/rmq_checkstyle.xml b/style/rmq_checkstyle.xml
index 15c267e..1e2d7bc 100644
--- a/style/rmq_checkstyle.xml
+++ b/style/rmq_checkstyle.xml
@@ -130,13 +130,17 @@
<module name="GenericWhitespace"/>
<!--<module name="NoWhitespaceBefore"/>-->
<!--<module name="NoWhitespaceAfter"/>-->
+ <!--
<module name="WhitespaceAround">
<property name="allowEmptyConstructors" value="true"/>
<property name="allowEmptyMethods" value="true"/>
</module>
- <module name="Indentation"/>
+ -->
+ <!--Lambda indentation should be changed for later update -->
+ <!-- <module name="Indentation"/>-->
+
<module name="MethodParamPad"/>
<module name="ParenPad"/>
<module name="TypecastParenPad"/>
</module>
-</module>
\ No newline at end of file
+</module>