This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git


The following commit(s) were added to refs/heads/main by this push:
     new da37aee  Bugfix for default branch logic
da37aee is described below

commit da37aee40ce6636fd427ac2a8fbb1f5a6e89d0c6
Author: vongosling <[email protected]>
AuthorDate: Wed Aug 25 16:34:27 2021 +0800

    Bugfix for default branch logic
---
 .../RowKeyValueDeserializationSchema.java          | 22 +++++++++++-----------
 .../deserializer/RowDeserializationSchema.java     | 10 +++++-----
 2 files changed, 16 insertions(+), 16 deletions(-)

diff --git 
a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
 
b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
index 9ea5aa3..0ca130d 100644
--- 
a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
+++ 
b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
@@ -219,14 +219,14 @@ public class RowKeyValueDeserializationSchema implements 
KeyValueDeserialization
             case SKIP_SILENT:
                 skip = true;
                 break;
+            case EXCEPTION:
+                throw new RuntimeException(e);
             case CUT:
             case NULL:
             case PAD:
+            default:
                 row.setField(index, null);
                 break;
-            case EXCEPTION:
-                throw new RuntimeException(e);
-            default:
         }
 
         return skip;
@@ -238,13 +238,6 @@ public class RowKeyValueDeserializationSchema implements 
KeyValueDeserialization
                         "Field missing exception, table column number: %d, 
data column number: %d, data field number: %d, data: [%s].",
                         columnSize, columnSize, data.length, 
StringUtils.join(data, ","));
         switch (fieldMissingStrategy) {
-            case SKIP:
-                long now = System.currentTimeMillis();
-                if (columnErrorDebug || now - lastLogHandleFieldTime > 
DEFAULT_LOG_INTERVAL_MS) {
-                    LOGGER.warn(fieldMissingMessage);
-                    lastLogHandleFieldTime = now;
-                }
-                return null;
             case EXCEPTION:
                 LOGGER.error(fieldMissingMessage);
                 throw new RuntimeException(fieldMissingMessage);
@@ -252,8 +245,15 @@ public class RowKeyValueDeserializationSchema implements 
KeyValueDeserialization
             case CUT:
             case NULL:
             case PAD:
-            default:
                 return data;
+            case SKIP:
+            default:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogHandleFieldTime > 
DEFAULT_LOG_INTERVAL_MS) {
+                    LOGGER.warn(fieldMissingMessage);
+                    lastLogHandleFieldTime = now;
+                }
+                return null;
         }
     }
 
diff --git 
a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
 
b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
index 08a0b25..8beaaa2 100644
--- 
a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
+++ 
b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
@@ -57,7 +57,7 @@ public class RowDeserializationSchema
         implements DeserializationSchema<List<BytesMessage>, RowData> {
 
     private static final long serialVersionUID = -1L;
-    private static final Logger LOG = 
LoggerFactory.getLogger(RowDeserializationSchema.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RowDeserializationSchema.class);
 
     private transient TableSchema tableSchema;
     private final DirtyDataStrategy formatErrorStrategy;
@@ -180,7 +180,7 @@ public class RowDeserializationSchema
                 collector.collect(rowData);
             } else {
                 if (message.getData() == null) {
-                    LOG.info("Deserialize empty BytesMessage body, ignore the 
empty message.");
+                    LOGGER.info("Deserialize empty BytesMessage body, ignore 
the empty message.");
                     return;
                 }
                 deserializeBytesMessage(message, collector);
@@ -287,7 +287,7 @@ public class RowDeserializationSchema
             case SKIP:
                 long now = System.currentTimeMillis();
                 if (columnErrorDebug || now - lastLogExceptionTime > 
DEFAULT_LOG_INTERVAL_MS) {
-                    LOG.warn(
+                    LOGGER.warn(
                             "Data format error, field type: "
                                     + fieldTypes[index]
                                     + "field data: "
@@ -323,7 +323,7 @@ public class RowDeserializationSchema
             case SKIP:
                 long now = System.currentTimeMillis();
                 if (columnErrorDebug || now - lastLogHandleFieldTime > 
DEFAULT_LOG_INTERVAL_MS) {
-                    LOG.warn(
+                    LOGGER.warn(
                             "Field missing error, table column number: "
                                     + totalColumnSize
                                     + ", data column number: "
@@ -363,7 +363,7 @@ public class RowDeserializationSchema
             case SKIP:
                 long now = System.currentTimeMillis();
                 if (columnErrorDebug || now - lastLogHandleFieldTime > 
DEFAULT_LOG_INTERVAL_MS) {
-                    LOG.warn(
+                    LOGGER.warn(
                             "Field increment error, table column number: "
                                     + totalColumnSize
                                     + ", data column number: "

Reply via email to