This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git
commit 7919a752df588d21bdd6b34a8aefd3bc07f74cc9 Author: Etienne Chauchot <[email protected]> AuthorDate: Mon Jun 5 15:45:12 2023 +0200 [FLINK-32222] avoid using non-public DataInputDeserializer and DataOutputSerializer. Remove overkill ThreadLocal cache. Update archunit violations --- .../6f48ec51-5ac9-42bb-a270-05be6c83237d | 0 .../dcfaa83d-a12c-48e1-9e51-b8d3808cd287 | 10 ++++++ .../archunit-violations/stored.rules | 14 ++++---- .../source/split/CassandraSplitSerializer.java | 37 +++++++++++++--------- 4 files changed, 39 insertions(+), 22 deletions(-) diff --git a/flink-connector-cassandra/archunit-violations/6f48ec51-5ac9-42bb-a270-05be6c83237d b/flink-connector-cassandra/archunit-violations/6f48ec51-5ac9-42bb-a270-05be6c83237d deleted file mode 100644 index e69de29..0000000 diff --git a/flink-connector-cassandra/archunit-violations/dcfaa83d-a12c-48e1-9e51-b8d3808cd287 b/flink-connector-cassandra/archunit-violations/dcfaa83d-a12c-48e1-9e51-b8d3808cd287 new file mode 100644 index 0000000..5c9a448 --- /dev/null +++ b/flink-connector-cassandra/archunit-violations/dcfaa83d-a12c-48e1-9e51-b8d3808cd287 @@ -0,0 +1,10 @@ +Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (CassandraSource.java:138) +Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (CassandraSource.java:124) +Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (CassandraSource.java:125) +Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (CassandraSource.java:126) +Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.String, [Ljava.lang.Object;)> in (CassandraSource.java:127) +Method <org.apache.flink.connector.cassandra.source.CassandraSource.checkQueryValidity(java.lang.String)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in (CassandraSource.java:145) +Method <org.apache.flink.connector.cassandra.source.CassandraSource.checkQueryValidity(java.lang.String)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in (CassandraSource.java:149) +Method <org.apache.flink.connector.cassandra.source.CassandraSource.checkQueryValidity(java.lang.String)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (CassandraSource.java:0) +Method <org.apache.flink.connector.cassandra.source.reader.CassandraSplitReader.generateRangeQuery(java.lang.String, java.lang.String)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (CassandraSplitReader.java:0) +Method <org.apache.flink.connector.cassandra.source.split.SplitsGenerator.estimateTableSize()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (SplitsGenerator.java:0) \ No newline at end of file diff --git a/flink-connector-cassandra/archunit-violations/stored.rules b/flink-connector-cassandra/archunit-violations/stored.rules index 60a8a80..61f4a37 100644 --- a/flink-connector-cassandra/archunit-violations/stored.rules +++ b/flink-connector-cassandra/archunit-violations/stored.rules @@ -1,10 +1,10 @@ # -#Wed Nov 23 13:43:34 CET 2022 -Production\ code\ must\ not\ call\ methods\ annotated\ with\ @VisibleForTesting=d181ab66-6399-4468-b7f8-1263b90d7577 -Options\ for\ connectors\ and\ formats\ should\ reside\ in\ a\ consistent\ package\ and\ be\ public\ API.=69754155-7c30-42a8-8fd3-c5a488d6d1b9 -Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ ITCase=b7279bb1-1eb7-40c0-931d-f6db7971d126 +#Thu Jun 08 12:56:56 CEST 2023 Classes\ in\ API\ packages\ should\ have\ at\ least\ one\ API\ visibility\ annotation.=ea12954c-9e1e-4db3-bd78-2f30ec06d270 -Return\ and\ argument\ types\ of\ methods\ annotated\ with\ @PublicEvolving\ must\ be\ annotated\ with\ @Public(Evolving).=738e8069-6550-4700-a662-dcd027d3ca55 -Return\ and\ argument\ types\ of\ methods\ annotated\ with\ @Public\ must\ be\ annotated\ with\ @Public.=01b274c9-e1ef-4fad-accd-703c7e6ad9f3 ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\ extension=dc1ba6f4-3d84-498c-a085-e02ba5936201 -Connector\ production\ code\ must\ not\ depend\ on\ non-public\ API\ outside\ of\ connector\ packages=6f48ec51-5ac9-42bb-a270-05be6c83237d +Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ ITCase=b7279bb1-1eb7-40c0-931d-f6db7971d126 +Options\ for\ connectors\ and\ formats\ should\ reside\ in\ a\ consistent\ package\ and\ be\ public\ API.=69754155-7c30-42a8-8fd3-c5a488d6d1b9 +Production\ code\ must\ not\ call\ methods\ annotated\ with\ @VisibleForTesting=d181ab66-6399-4468-b7f8-1263b90d7577 +Connector\ production\ code\ must\ depend\ only\ on\ public\ API\ when\ outside\ of\ connector\ packages=dcfaa83d-a12c-48e1-9e51-b8d3808cd287 +Return\ and\ argument\ types\ of\ methods\ annotated\ with\ @Public\ must\ be\ annotated\ with\ @Public.=01b274c9-e1ef-4fad-accd-703c7e6ad9f3 +Return\ and\ argument\ types\ of\ methods\ annotated\ with\ @PublicEvolving\ must\ be\ annotated\ with\ @Public(Evolving).=738e8069-6550-4700-a662-dcd027d3ca55 diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java index 74fa573..888998d 100644 --- a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java @@ -20,18 +20,18 @@ package org.apache.flink.connector.cassandra.source.split; import org.apache.flink.connector.cassandra.source.utils.BigIntegerSerializationUtils; import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.core.memory.DataInputDeserializer; -import org.apache.flink.core.memory.DataOutputSerializer; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.math.BigInteger; /** Serializer for {@link CassandraSplit}. */ public class CassandraSplitSerializer implements SimpleVersionedSerializer<CassandraSplit> { public static final CassandraSplitSerializer INSTANCE = new CassandraSplitSerializer(); - private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE = - ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); public static final int CURRENT_VERSION = 0; @@ -44,20 +44,27 @@ public class CassandraSplitSerializer implements SimpleVersionedSerializer<Cassa @Override public byte[] serialize(CassandraSplit cassandraSplit) throws IOException { - final DataOutputSerializer out = SERIALIZER_CACHE.get(); - BigIntegerSerializationUtils.write(cassandraSplit.getRingRangeStart(), out); - BigIntegerSerializationUtils.write(cassandraSplit.getRingRangeEnd(), out); - final byte[] result = out.getCopyOfBuffer(); - out.clear(); - return result; + try (final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + final ObjectOutputStream objectOutputStream = + new ObjectOutputStream(byteArrayOutputStream)) { + BigIntegerSerializationUtils.write( + cassandraSplit.getRingRangeStart(), objectOutputStream); + BigIntegerSerializationUtils.write( + cassandraSplit.getRingRangeEnd(), objectOutputStream); + objectOutputStream.flush(); + return byteArrayOutputStream.toByteArray(); + } } @Override public CassandraSplit deserialize(int version, byte[] serialized) throws IOException { - final DataInputDeserializer in = new DataInputDeserializer(serialized); - - final BigInteger ringRangeStart = BigIntegerSerializationUtils.read(in); - final BigInteger ringRangeEnd = BigIntegerSerializationUtils.read(in); - return new CassandraSplit(ringRangeStart, ringRangeEnd); + try (final ByteArrayInputStream byteArrayInputStream = + new ByteArrayInputStream(serialized); + final ObjectInputStream objectInputStream = + new ObjectInputStream(byteArrayInputStream)) { + final BigInteger ringRangeStart = BigIntegerSerializationUtils.read(objectInputStream); + final BigInteger ringRangeEnd = BigIntegerSerializationUtils.read(objectInputStream); + return new CassandraSplit(ringRangeStart, ringRangeEnd); + } } }
