sharonx commented on code in PR #3794:
URL: https://github.com/apache/flink-cdc/pull/3794#discussion_r1881201626


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java:
##########
@@ -72,28 +72,32 @@ public byte[] serialize(PendingSplitsState state) throws 
IOException {
         }
         final DataOutputSerializer out = SERIALIZER_CACHE.get();
 
-        out.writeInt(splitSerializer.getVersion());
-        if (state instanceof SnapshotPendingSplitsState) {
-            out.writeInt(SNAPSHOT_PENDING_SPLITS_STATE_FLAG);
-            serializeSnapshotPendingSplitsState((SnapshotPendingSplitsState) 
state, out);
-        } else if (state instanceof BinlogPendingSplitsState) {
-            out.writeInt(BINLOG_PENDING_SPLITS_STATE_FLAG);
-            serializeBinlogPendingSplitsState((BinlogPendingSplitsState) 
state, out);
-        } else if (state instanceof HybridPendingSplitsState) {
-            out.writeInt(HYBRID_PENDING_SPLITS_STATE_FLAG);
-            serializeHybridPendingSplitsState((HybridPendingSplitsState) 
state, out);
-        } else {
-            throw new IOException(
-                    "Unsupported to serialize PendingSplitsState class: "
-                            + state.getClass().getName());
-        }
+        try {
+            out.writeInt(splitSerializer.getVersion());
+            if (state instanceof SnapshotPendingSplitsState) {
+                out.writeInt(SNAPSHOT_PENDING_SPLITS_STATE_FLAG);
+                
serializeSnapshotPendingSplitsState((SnapshotPendingSplitsState) state, out);
+            } else if (state instanceof BinlogPendingSplitsState) {
+                out.writeInt(BINLOG_PENDING_SPLITS_STATE_FLAG);
+                serializeBinlogPendingSplitsState((BinlogPendingSplitsState) 
state, out);
+            } else if (state instanceof HybridPendingSplitsState) {
+                out.writeInt(HYBRID_PENDING_SPLITS_STATE_FLAG);
+                serializeHybridPendingSplitsState((HybridPendingSplitsState) 
state, out);
+            } else {
+                throw new IOException(
+                        "Unsupported to serialize PendingSplitsState class: "
+                                + state.getClass().getName());
+            }
+
+            final byte[] result = out.getCopyOfBuffer();
+            // optimization: cache the serialized from, so we avoid the byte 
work during repeated
+            // serialization
+            state.serializedFormCache = result;

Review Comment:
   I see the same chunk of code in the base folder 
https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java#L83-L104.
 Should it be fixed too?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java:
##########
@@ -102,6 +104,22 @@ public void testRepeatedSerializationCache() throws 
Exception {
         assertSame(ser1, ser3);
     }
 
+    @Test
+    public void testOutputIsFinallyCleared() throws Exception {
+        final PendingSplitsStateSerializer serializer =
+                new 
PendingSplitsStateSerializer(MySqlSplitSerializer.INSTANCE);
+
+        final byte[] ser1 = serializer.serialize(state);
+        state.serializedFormCache = null;
+
+        PendingSplitsState unsupportedState = new 
UnsupportedPendingSplitsState();
+
+        assertThrows(IOException.class, () -> 
serializer.serialize(unsupportedState));
+
+        final byte[] ser2 = serializer.serialize(state);
+        assertEquals(ser1.length, ser2.length);

Review Comment:
   Is it better to assert that `ser1` and `ser2` are the same, since there 
shouldn't be any changes to the state if an error happens based on my 
understanding.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to