This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git

commit 7919a752df588d21bdd6b34a8aefd3bc07f74cc9
Author: Etienne Chauchot <[email protected]>
AuthorDate: Mon Jun 5 15:45:12 2023 +0200

    [FLINK-32222] avoid using non-public DataInputDeserializer and 
DataOutputSerializer. Remove overkill ThreadLocal cache. Update archunit 
violations
---
 .../6f48ec51-5ac9-42bb-a270-05be6c83237d           |  0
 .../dcfaa83d-a12c-48e1-9e51-b8d3808cd287           | 10 ++++++
 .../archunit-violations/stored.rules               | 14 ++++----
 .../source/split/CassandraSplitSerializer.java     | 37 +++++++++++++---------
 4 files changed, 39 insertions(+), 22 deletions(-)

diff --git 
a/flink-connector-cassandra/archunit-violations/6f48ec51-5ac9-42bb-a270-05be6c83237d
 
b/flink-connector-cassandra/archunit-violations/6f48ec51-5ac9-42bb-a270-05be6c83237d
deleted file mode 100644
index e69de29..0000000
diff --git 
a/flink-connector-cassandra/archunit-violations/dcfaa83d-a12c-48e1-9e51-b8d3808cd287
 
b/flink-connector-cassandra/archunit-violations/dcfaa83d-a12c-48e1-9e51-b8d3808cd287
new file mode 100644
index 0000000..5c9a448
--- /dev/null
+++ 
b/flink-connector-cassandra/archunit-violations/dcfaa83d-a12c-48e1-9e51-b8d3808cd287
@@ -0,0 +1,10 @@
+Constructor 
<org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder,
 long, java.lang.Class, java.lang.String, 
org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method 
<org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, 
org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in 
(CassandraSource.java:138)
+Constructor 
<org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder,
 long, java.lang.Class, java.lang.String, 
org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, 
java.lang.String)> in (CassandraSource.java:124)
+Constructor 
<org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder,
 long, java.lang.Class, java.lang.String, 
org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, 
java.lang.String)> in (CassandraSource.java:125)
+Constructor 
<org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder,
 long, java.lang.Class, java.lang.String, 
org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, 
java.lang.String)> in (CassandraSource.java:126)
+Constructor 
<org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder,
 long, java.lang.Class, java.lang.String, 
org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method 
<org.apache.flink.util.Preconditions.checkState(boolean, java.lang.String, 
[Ljava.lang.Object;)> in (CassandraSource.java:127)
+Method 
<org.apache.flink.connector.cassandra.source.CassandraSource.checkQueryValidity(java.lang.String)>
 calls method <org.apache.flink.util.Preconditions.checkState(boolean, 
java.lang.Object)> in (CassandraSource.java:145)
+Method 
<org.apache.flink.connector.cassandra.source.CassandraSource.checkQueryValidity(java.lang.String)>
 calls method <org.apache.flink.util.Preconditions.checkState(boolean, 
java.lang.Object)> in (CassandraSource.java:149)
+Method 
<org.apache.flink.connector.cassandra.source.CassandraSource.checkQueryValidity(java.lang.String)>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(CassandraSource.java:0)
+Method 
<org.apache.flink.connector.cassandra.source.reader.CassandraSplitReader.generateRangeQuery(java.lang.String,
 java.lang.String)> is annotated with 
<org.apache.flink.annotation.VisibleForTesting> in (CassandraSplitReader.java:0)
+Method 
<org.apache.flink.connector.cassandra.source.split.SplitsGenerator.estimateTableSize()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(SplitsGenerator.java:0)
\ No newline at end of file
diff --git a/flink-connector-cassandra/archunit-violations/stored.rules 
b/flink-connector-cassandra/archunit-violations/stored.rules
index 60a8a80..61f4a37 100644
--- a/flink-connector-cassandra/archunit-violations/stored.rules
+++ b/flink-connector-cassandra/archunit-violations/stored.rules
@@ -1,10 +1,10 @@
 #
-#Wed Nov 23 13:43:34 CET 2022
-Production\ code\ must\ not\ call\ methods\ annotated\ with\ 
@VisibleForTesting=d181ab66-6399-4468-b7f8-1263b90d7577
-Options\ for\ connectors\ and\ formats\ should\ reside\ in\ a\ consistent\ 
package\ and\ be\ public\ API.=69754155-7c30-42a8-8fd3-c5a488d6d1b9
-Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ 
ITCase=b7279bb1-1eb7-40c0-931d-f6db7971d126
+#Thu Jun 08 12:56:56 CEST 2023
 Classes\ in\ API\ packages\ should\ have\ at\ least\ one\ API\ visibility\ 
annotation.=ea12954c-9e1e-4db3-bd78-2f30ec06d270
-Return\ and\ argument\ types\ of\ methods\ annotated\ with\ @PublicEvolving\ 
must\ be\ annotated\ with\ 
@Public(Evolving).=738e8069-6550-4700-a662-dcd027d3ca55
-Return\ and\ argument\ types\ of\ methods\ annotated\ with\ @Public\ must\ be\ 
annotated\ with\ @Public.=01b274c9-e1ef-4fad-accd-703c7e6ad9f3
 ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\ 
extension=dc1ba6f4-3d84-498c-a085-e02ba5936201
-Connector\ production\ code\ must\ not\ depend\ on\ non-public\ API\ outside\ 
of\ connector\ packages=6f48ec51-5ac9-42bb-a270-05be6c83237d
+Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ 
ITCase=b7279bb1-1eb7-40c0-931d-f6db7971d126
+Options\ for\ connectors\ and\ formats\ should\ reside\ in\ a\ consistent\ 
package\ and\ be\ public\ API.=69754155-7c30-42a8-8fd3-c5a488d6d1b9
+Production\ code\ must\ not\ call\ methods\ annotated\ with\ 
@VisibleForTesting=d181ab66-6399-4468-b7f8-1263b90d7577
+Connector\ production\ code\ must\ depend\ only\ on\ public\ API\ when\ 
outside\ of\ connector\ packages=dcfaa83d-a12c-48e1-9e51-b8d3808cd287
+Return\ and\ argument\ types\ of\ methods\ annotated\ with\ @Public\ must\ be\ 
annotated\ with\ @Public.=01b274c9-e1ef-4fad-accd-703c7e6ad9f3
+Return\ and\ argument\ types\ of\ methods\ annotated\ with\ @PublicEvolving\ 
must\ be\ annotated\ with\ 
@Public(Evolving).=738e8069-6550-4700-a662-dcd027d3ca55
diff --git 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java
 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java
index 74fa573..888998d 100644
--- 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java
+++ 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java
@@ -20,18 +20,18 @@ package org.apache.flink.connector.cassandra.source.split;
 
 import 
org.apache.flink.connector.cassandra.source.utils.BigIntegerSerializationUtils;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.core.memory.DataInputDeserializer;
-import org.apache.flink.core.memory.DataOutputSerializer;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.math.BigInteger;
 
 /** Serializer for {@link CassandraSplit}. */
 public class CassandraSplitSerializer implements 
SimpleVersionedSerializer<CassandraSplit> {
 
     public static final CassandraSplitSerializer INSTANCE = new 
CassandraSplitSerializer();
-    private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
-            ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
 
     public static final int CURRENT_VERSION = 0;
 
@@ -44,20 +44,27 @@ public class CassandraSplitSerializer implements 
SimpleVersionedSerializer<Cassa
 
     @Override
     public byte[] serialize(CassandraSplit cassandraSplit) throws IOException {
-        final DataOutputSerializer out = SERIALIZER_CACHE.get();
-        BigIntegerSerializationUtils.write(cassandraSplit.getRingRangeStart(), 
out);
-        BigIntegerSerializationUtils.write(cassandraSplit.getRingRangeEnd(), 
out);
-        final byte[] result = out.getCopyOfBuffer();
-        out.clear();
-        return result;
+        try (final ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+                final ObjectOutputStream objectOutputStream =
+                        new ObjectOutputStream(byteArrayOutputStream)) {
+            BigIntegerSerializationUtils.write(
+                    cassandraSplit.getRingRangeStart(), objectOutputStream);
+            BigIntegerSerializationUtils.write(
+                    cassandraSplit.getRingRangeEnd(), objectOutputStream);
+            objectOutputStream.flush();
+            return byteArrayOutputStream.toByteArray();
+        }
     }
 
     @Override
     public CassandraSplit deserialize(int version, byte[] serialized) throws 
IOException {
-        final DataInputDeserializer in = new DataInputDeserializer(serialized);
-
-        final BigInteger ringRangeStart = 
BigIntegerSerializationUtils.read(in);
-        final BigInteger ringRangeEnd = BigIntegerSerializationUtils.read(in);
-        return new CassandraSplit(ringRangeStart, ringRangeEnd);
+        try (final ByteArrayInputStream byteArrayInputStream =
+                        new ByteArrayInputStream(serialized);
+                final ObjectInputStream objectInputStream =
+                        new ObjectInputStream(byteArrayInputStream)) {
+            final BigInteger ringRangeStart = 
BigIntegerSerializationUtils.read(objectInputStream);
+            final BigInteger ringRangeEnd = 
BigIntegerSerializationUtils.read(objectInputStream);
+            return new CassandraSplit(ringRangeStart, ringRangeEnd);
+        }
     }
 }

Reply via email to