This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git

commit 4f1cdfa84718e95b7092b9b77946c3275771d71d
Author: Etienne Chauchot <[email protected]>
AuthorDate: Thu Jun 8 12:47:56 2023 +0200

    [FLINK-32222] Automatically flush outputStreams in serializers
---
 .../source/enumerator/CassandraEnumeratorStateSerializer.java  | 10 ++++------
 .../cassandra/source/split/CassandraSplitSerializer.java       |  9 ++++-----
 2 files changed, 8 insertions(+), 11 deletions(-)

diff --git 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java
 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java
index bb8533b..0d4dffd 100644
--- 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java
+++ 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java
@@ -49,9 +49,9 @@ public class CassandraEnumeratorStateSerializer
 
     @Override
     public byte[] serialize(CassandraEnumeratorState cassandraEnumeratorState) 
throws IOException {
-        try (final ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
-                final ObjectOutputStream objectOutputStream =
-                        new ObjectOutputStream(byteArrayOutputStream)) {
+        final ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        try (final ObjectOutputStream objectOutputStream =
+                new ObjectOutputStream(byteArrayOutputStream)) {
             final Queue<CassandraSplit> splitsToReassign =
                     cassandraEnumeratorState.getSplitsToReassign();
             objectOutputStream.writeInt(splitsToReassign.size());
@@ -69,10 +69,8 @@ public class CassandraEnumeratorStateSerializer
                     cassandraEnumeratorState.getStartToken(), 
objectOutputStream);
             BigIntegerSerializationUtils.write(
                     cassandraEnumeratorState.getMaxToken(), 
objectOutputStream);
-
-            objectOutputStream.flush();
-            return byteArrayOutputStream.toByteArray();
         }
+        return byteArrayOutputStream.toByteArray();
     }
 
     @Override
diff --git 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java
 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java
index 888998d..a7f68d1 100644
--- 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java
+++ 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java
@@ -44,16 +44,15 @@ public class CassandraSplitSerializer implements 
SimpleVersionedSerializer<Cassa
 
     @Override
     public byte[] serialize(CassandraSplit cassandraSplit) throws IOException {
-        try (final ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
-                final ObjectOutputStream objectOutputStream =
-                        new ObjectOutputStream(byteArrayOutputStream)) {
+        final ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        try (final ObjectOutputStream objectOutputStream =
+                new ObjectOutputStream(byteArrayOutputStream)) {
             BigIntegerSerializationUtils.write(
                     cassandraSplit.getRingRangeStart(), objectOutputStream);
             BigIntegerSerializationUtils.write(
                     cassandraSplit.getRingRangeEnd(), objectOutputStream);
-            objectOutputStream.flush();
-            return byteArrayOutputStream.toByteArray();
         }
+        return byteArrayOutputStream.toByteArray();
     }
 
     @Override

Reply via email to