This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b82d3e8e [INLONG-7014][Sort] Mysql state restore is compatible with 
old task state (#7015)
0b82d3e8e is described below

commit 0b82d3e8ebcbbef2de35ecc3d0d910a82a8ff1b5
Author: Xin Gong <[email protected]>
AuthorDate: Thu Dec 22 09:48:34 2022 +0800

    [INLONG-7014][Sort] Mysql state restore is compatible with old task state 
(#7015)
---
 .../mysql/source/split/MySqlSplitSerializer.java   | 28 ++++++++++++++--------
 1 file changed, 18 insertions(+), 10 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
index 325ef4fca..6bb625779 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
 import 
org.apache.inlong.sort.cdc.base.debezium.history.FlinkJsonTableChangeSerializer;
 import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
+import 
org.apache.inlong.sort.cdc.mysql.source.split.MySqlMetricSplit.MySqlTableMetric;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -36,7 +37,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import 
org.apache.inlong.sort.cdc.mysql.source.split.MySqlMetricSplit.MySqlTableMetric;
 
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.SerializerUtils.readBinlogPosition;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.SerializerUtils.rowToSerializedString;
@@ -147,9 +147,11 @@ public final class MySqlSplitSerializer implements 
SimpleVersionedSerializer<MyS
 
     private static Map<String, Long> readReadPhaseMetric(DataInputDeserializer 
in) throws IOException {
         Map<String, Long> readPhaseMetrics = new HashMap<>();
-        final int size = in.readInt();
-        for (int i = 0; i < size; i++) {
-            readPhaseMetrics.put(in.readUTF(), in.readLong());
+        if (in.available() > 0) {
+            final int size = in.readInt();
+            for (int i = 0; i < size; i++) {
+                readPhaseMetrics.put(in.readUTF(), in.readLong());
+            }
         }
         return readPhaseMetrics;
     }
@@ -167,10 +169,12 @@ public final class MySqlSplitSerializer implements 
SimpleVersionedSerializer<MyS
 
     private static Map<String, MySqlTableMetric> 
readTableMetrics(DataInputDeserializer in) throws IOException {
         Map<String, MySqlTableMetric> tableMetrics = new HashMap<>();
-        final int size = in.readInt();
-        for (int i = 0; i < size; i++) {
-            String tableIdentify = in.readUTF();
-            tableMetrics.put(tableIdentify, new 
MySqlTableMetric(in.readLong(), in.readLong()));
+        if (in.available() > 0) {
+            final int size = in.readInt();
+            for (int i = 0; i < size; i++) {
+                String tableIdentify = in.readUTF();
+                tableMetrics.put(tableIdentify, new 
MySqlTableMetric(in.readLong(), in.readLong()));
+            }
         }
         return tableMetrics;
     }
@@ -308,8 +312,12 @@ public final class MySqlSplitSerializer implements 
SimpleVersionedSerializer<MyS
                     totalFinishedSplitSize,
                     isSuspended);
         } else if (splitKind == METRIC_SPLIT_FLAG) {
-            long numBytesIn = in.readLong();
-            long numRecordsIn = in.readLong();
+            long numBytesIn = 0L;
+            long numRecordsIn = 0L;
+            if (in.available() > 0) {
+                numBytesIn = in.readLong();
+                numRecordsIn = in.readLong();
+            }
             Map<String, Long> readPhaseMetricMap = readReadPhaseMetric(in);
             Map<String, MySqlTableMetric> tableMetricMap = 
readTableMetrics(in);
 

Reply via email to