This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push:
new da37aee Bugfix for default branch logic
da37aee is described below
commit da37aee40ce6636fd427ac2a8fbb1f5a6e89d0c6
Author: vongosling <[email protected]>
AuthorDate: Wed Aug 25 16:34:27 2021 +0800
Bugfix for default branch logic
---
.../RowKeyValueDeserializationSchema.java | 22 +++++++++++-----------
.../deserializer/RowDeserializationSchema.java | 10 +++++-----
2 files changed, 16 insertions(+), 16 deletions(-)
diff --git
a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
index 9ea5aa3..0ca130d 100644
---
a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
+++
b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
@@ -219,14 +219,14 @@ public class RowKeyValueDeserializationSchema implements
KeyValueDeserialization
case SKIP_SILENT:
skip = true;
break;
+ case EXCEPTION:
+ throw new RuntimeException(e);
case CUT:
case NULL:
case PAD:
+ default:
row.setField(index, null);
break;
- case EXCEPTION:
- throw new RuntimeException(e);
- default:
}
return skip;
@@ -238,13 +238,6 @@ public class RowKeyValueDeserializationSchema implements
KeyValueDeserialization
"Field missing exception, table column number: %d,
data column number: %d, data field number: %d, data: [%s].",
columnSize, columnSize, data.length,
StringUtils.join(data, ","));
switch (fieldMissingStrategy) {
- case SKIP:
- long now = System.currentTimeMillis();
- if (columnErrorDebug || now - lastLogHandleFieldTime >
DEFAULT_LOG_INTERVAL_MS) {
- LOGGER.warn(fieldMissingMessage);
- lastLogHandleFieldTime = now;
- }
- return null;
case EXCEPTION:
LOGGER.error(fieldMissingMessage);
throw new RuntimeException(fieldMissingMessage);
@@ -252,8 +245,15 @@ public class RowKeyValueDeserializationSchema implements
KeyValueDeserialization
case CUT:
case NULL:
case PAD:
- default:
return data;
+ case SKIP:
+ default:
+ long now = System.currentTimeMillis();
+ if (columnErrorDebug || now - lastLogHandleFieldTime >
DEFAULT_LOG_INTERVAL_MS) {
+ LOGGER.warn(fieldMissingMessage);
+ lastLogHandleFieldTime = now;
+ }
+ return null;
}
}
diff --git
a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
index 08a0b25..8beaaa2 100644
---
a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
+++
b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
@@ -57,7 +57,7 @@ public class RowDeserializationSchema
implements DeserializationSchema<List<BytesMessage>, RowData> {
private static final long serialVersionUID = -1L;
- private static final Logger LOG =
LoggerFactory.getLogger(RowDeserializationSchema.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RowDeserializationSchema.class);
private transient TableSchema tableSchema;
private final DirtyDataStrategy formatErrorStrategy;
@@ -180,7 +180,7 @@ public class RowDeserializationSchema
collector.collect(rowData);
} else {
if (message.getData() == null) {
- LOG.info("Deserialize empty BytesMessage body, ignore the
empty message.");
+ LOGGER.info("Deserialize empty BytesMessage body, ignore
the empty message.");
return;
}
deserializeBytesMessage(message, collector);
@@ -287,7 +287,7 @@ public class RowDeserializationSchema
case SKIP:
long now = System.currentTimeMillis();
if (columnErrorDebug || now - lastLogExceptionTime >
DEFAULT_LOG_INTERVAL_MS) {
- LOG.warn(
+ LOGGER.warn(
"Data format error, field type: "
+ fieldTypes[index]
+ "field data: "
@@ -323,7 +323,7 @@ public class RowDeserializationSchema
case SKIP:
long now = System.currentTimeMillis();
if (columnErrorDebug || now - lastLogHandleFieldTime >
DEFAULT_LOG_INTERVAL_MS) {
- LOG.warn(
+ LOGGER.warn(
"Field missing error, table column number: "
+ totalColumnSize
+ ", data column number: "
@@ -363,7 +363,7 @@ public class RowDeserializationSchema
case SKIP:
long now = System.currentTimeMillis();
if (columnErrorDebug || now - lastLogHandleFieldTime >
DEFAULT_LOG_INTERVAL_MS) {
- LOG.warn(
+ LOGGER.warn(
"Field increment error, table column number: "
+ totalColumnSize
+ ", data column number: "