This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0b82d3e8e [INLONG-7014][Sort] Mysql state restore is compatible with
old task state (#7015)
0b82d3e8e is described below
commit 0b82d3e8ebcbbef2de35ecc3d0d910a82a8ff1b5
Author: Xin Gong <[email protected]>
AuthorDate: Thu Dec 22 09:48:34 2022 +0800
[INLONG-7014][Sort] Mysql state restore is compatible with old task state
(#7015)
---
.../mysql/source/split/MySqlSplitSerializer.java | 28 ++++++++++++++--------
1 file changed, 18 insertions(+), 10 deletions(-)
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
index 325ef4fca..6bb625779 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
import
org.apache.inlong.sort.cdc.base.debezium.history.FlinkJsonTableChangeSerializer;
import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
+import
org.apache.inlong.sort.cdc.mysql.source.split.MySqlMetricSplit.MySqlTableMetric;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -36,7 +37,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import
org.apache.inlong.sort.cdc.mysql.source.split.MySqlMetricSplit.MySqlTableMetric;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.SerializerUtils.readBinlogPosition;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.SerializerUtils.rowToSerializedString;
@@ -147,9 +147,11 @@ public final class MySqlSplitSerializer implements
SimpleVersionedSerializer<MyS
private static Map<String, Long> readReadPhaseMetric(DataInputDeserializer
in) throws IOException {
Map<String, Long> readPhaseMetrics = new HashMap<>();
- final int size = in.readInt();
- for (int i = 0; i < size; i++) {
- readPhaseMetrics.put(in.readUTF(), in.readLong());
+ if (in.available() > 0) {
+ final int size = in.readInt();
+ for (int i = 0; i < size; i++) {
+ readPhaseMetrics.put(in.readUTF(), in.readLong());
+ }
}
return readPhaseMetrics;
}
@@ -167,10 +169,12 @@ public final class MySqlSplitSerializer implements
SimpleVersionedSerializer<MyS
private static Map<String, MySqlTableMetric>
readTableMetrics(DataInputDeserializer in) throws IOException {
Map<String, MySqlTableMetric> tableMetrics = new HashMap<>();
- final int size = in.readInt();
- for (int i = 0; i < size; i++) {
- String tableIdentify = in.readUTF();
- tableMetrics.put(tableIdentify, new
MySqlTableMetric(in.readLong(), in.readLong()));
+ if (in.available() > 0) {
+ final int size = in.readInt();
+ for (int i = 0; i < size; i++) {
+ String tableIdentify = in.readUTF();
+ tableMetrics.put(tableIdentify, new
MySqlTableMetric(in.readLong(), in.readLong()));
+ }
}
return tableMetrics;
}
@@ -308,8 +312,12 @@ public final class MySqlSplitSerializer implements
SimpleVersionedSerializer<MyS
totalFinishedSplitSize,
isSuspended);
} else if (splitKind == METRIC_SPLIT_FLAG) {
- long numBytesIn = in.readLong();
- long numRecordsIn = in.readLong();
+ long numBytesIn = 0L;
+ long numRecordsIn = 0L;
+ if (in.available() > 0) {
+ numBytesIn = in.readLong();
+ numRecordsIn = in.readLong();
+ }
Map<String, Long> readPhaseMetricMap = readReadPhaseMetric(in);
Map<String, MySqlTableMetric> tableMetricMap =
readTableMetrics(in);