This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git
commit 4f1cdfa84718e95b7092b9b77946c3275771d71d Author: Etienne Chauchot <[email protected]> AuthorDate: Thu Jun 8 12:47:56 2023 +0200 [FLINK-32222] Automatically flush outputStreams in serializers --- .../source/enumerator/CassandraEnumeratorStateSerializer.java | 10 ++++------ .../cassandra/source/split/CassandraSplitSerializer.java | 9 ++++----- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java index bb8533b..0d4dffd 100644 --- a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java @@ -49,9 +49,9 @@ public class CassandraEnumeratorStateSerializer @Override public byte[] serialize(CassandraEnumeratorState cassandraEnumeratorState) throws IOException { - try (final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - final ObjectOutputStream objectOutputStream = - new ObjectOutputStream(byteArrayOutputStream)) { + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (final ObjectOutputStream objectOutputStream = + new ObjectOutputStream(byteArrayOutputStream)) { final Queue<CassandraSplit> splitsToReassign = cassandraEnumeratorState.getSplitsToReassign(); objectOutputStream.writeInt(splitsToReassign.size()); @@ -69,10 +69,8 @@ public class CassandraEnumeratorStateSerializer cassandraEnumeratorState.getStartToken(), objectOutputStream); BigIntegerSerializationUtils.write( cassandraEnumeratorState.getMaxToken(), objectOutputStream); - - objectOutputStream.flush(); - return byteArrayOutputStream.toByteArray(); } + return byteArrayOutputStream.toByteArray(); } @Override diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java index 888998d..a7f68d1 100644 --- a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java @@ -44,16 +44,15 @@ public class CassandraSplitSerializer implements SimpleVersionedSerializer<Cassa @Override public byte[] serialize(CassandraSplit cassandraSplit) throws IOException { - try (final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - final ObjectOutputStream objectOutputStream = - new ObjectOutputStream(byteArrayOutputStream)) { + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (final ObjectOutputStream objectOutputStream = + new ObjectOutputStream(byteArrayOutputStream)) { BigIntegerSerializationUtils.write( cassandraSplit.getRingRangeStart(), objectOutputStream); BigIntegerSerializationUtils.write( cassandraSplit.getRingRangeEnd(), objectOutputStream); - objectOutputStream.flush(); - return byteArrayOutputStream.toByteArray(); } + return byteArrayOutputStream.toByteArray(); } @Override
