sharonx commented on code in PR #3794:
URL: https://github.com/apache/flink-cdc/pull/3794#discussion_r1881201626
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java:
##########
@@ -72,28 +72,32 @@ public byte[] serialize(PendingSplitsState state) throws
IOException {
}
final DataOutputSerializer out = SERIALIZER_CACHE.get();
- out.writeInt(splitSerializer.getVersion());
- if (state instanceof SnapshotPendingSplitsState) {
- out.writeInt(SNAPSHOT_PENDING_SPLITS_STATE_FLAG);
- serializeSnapshotPendingSplitsState((SnapshotPendingSplitsState)
state, out);
- } else if (state instanceof BinlogPendingSplitsState) {
- out.writeInt(BINLOG_PENDING_SPLITS_STATE_FLAG);
- serializeBinlogPendingSplitsState((BinlogPendingSplitsState)
state, out);
- } else if (state instanceof HybridPendingSplitsState) {
- out.writeInt(HYBRID_PENDING_SPLITS_STATE_FLAG);
- serializeHybridPendingSplitsState((HybridPendingSplitsState)
state, out);
- } else {
- throw new IOException(
- "Unsupported to serialize PendingSplitsState class: "
- + state.getClass().getName());
- }
+ try {
+ out.writeInt(splitSerializer.getVersion());
+ if (state instanceof SnapshotPendingSplitsState) {
+ out.writeInt(SNAPSHOT_PENDING_SPLITS_STATE_FLAG);
+
serializeSnapshotPendingSplitsState((SnapshotPendingSplitsState) state, out);
+ } else if (state instanceof BinlogPendingSplitsState) {
+ out.writeInt(BINLOG_PENDING_SPLITS_STATE_FLAG);
+ serializeBinlogPendingSplitsState((BinlogPendingSplitsState)
state, out);
+ } else if (state instanceof HybridPendingSplitsState) {
+ out.writeInt(HYBRID_PENDING_SPLITS_STATE_FLAG);
+ serializeHybridPendingSplitsState((HybridPendingSplitsState)
state, out);
+ } else {
+ throw new IOException(
+ "Unsupported to serialize PendingSplitsState class: "
+ + state.getClass().getName());
+ }
+
+ final byte[] result = out.getCopyOfBuffer();
+ // optimization: cache the serialized from, so we avoid the byte
work during repeated
+ // serialization
+ state.serializedFormCache = result;
Review Comment:
I see the same chunk of code in the base folder
https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java#L83-L104.
Should it be fixed too?
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java:
##########
@@ -102,6 +104,22 @@ public void testRepeatedSerializationCache() throws
Exception {
assertSame(ser1, ser3);
}
+ @Test
+ public void testOutputIsFinallyCleared() throws Exception {
+ final PendingSplitsStateSerializer serializer =
+ new
PendingSplitsStateSerializer(MySqlSplitSerializer.INSTANCE);
+
+ final byte[] ser1 = serializer.serialize(state);
+ state.serializedFormCache = null;
+
+ PendingSplitsState unsupportedState = new
UnsupportedPendingSplitsState();
+
+ assertThrows(IOException.class, () ->
serializer.serialize(unsupportedState));
+
+ final byte[] ser2 = serializer.serialize(state);
+ assertEquals(ser1.length, ser2.length);
Review Comment:
Is it better to assert that `ser1` and `ser2` are the same, since there
shouldn't be any changes to the state if an error happens based on my
understanding.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]