This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit a18d7afa17027e40dd35cf1586edd9389ee6cecc Author: Arvid Heise <[email protected]> AuthorDate: Tue May 20 09:46:06 2025 +0200 [FLINK-37818] Move KafkaCommitter to internal There is no leaking of the specific class in any of the KafkaSink(Builder) signatures. --- .../archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612 | 4 ++-- .../apache/flink/connector/kafka/sink/KafkaCommittable.java | 3 ++- .../org/apache/flink/connector/kafka/sink/KafkaSink.java | 1 + .../org/apache/flink/connector/kafka/sink/KafkaWriter.java | 1 + .../connector/kafka/sink/{ => internal}/KafkaCommitter.java | 12 ++++++------ .../kafka/sink/{ => internal}/KafkaCommitterTest.java | 7 ++----- tools/releasing/shared | 2 +- 7 files changed, 15 insertions(+), 15 deletions(-) diff --git a/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612 b/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612 index b96d3c6b..a4256dac 100644 --- a/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612 +++ b/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612 @@ -23,8 +23,8 @@ Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourc Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.syncAvailabilityHelperWithReaders()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.anyOf(int, java.util.concurrent.CompletableFuture)> in (DynamicKafkaSourceReader.java:500) Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getProducerPool()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0) Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getTransactionalIdPrefix()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0) -Method <org.apache.flink.connector.kafka.sink.KafkaCommitter.getBackchannel()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0) -Method <org.apache.flink.connector.kafka.sink.KafkaCommitter.getCommittingProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0) +Method <org.apache.flink.connector.kafka.sink.internal.KafkaCommitter.getBackchannel()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0) +Method <org.apache.flink.connector.kafka.sink.internal.KafkaCommitter.getCommittingProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0) Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:178) Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:181) Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:177) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java index 1ce3b6bb..168bf2fd 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.kafka.sink; import org.apache.flink.annotation.Internal; import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer; +import org.apache.flink.connector.kafka.sink.internal.KafkaCommitter; import javax.annotation.Nullable; @@ -30,7 +31,7 @@ import java.util.Optional; * to commit transactions in {@link KafkaCommitter}. */ @Internal -class KafkaCommittable { +public class KafkaCommittable { private final long producerId; private final short epoch; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java index 77ddc87d..be631bb0 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java @@ -31,6 +31,7 @@ import org.apache.flink.connector.kafka.lineage.LineageUtil; import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet; import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider; import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer; +import org.apache.flink.connector.kafka.sink.internal.KafkaCommitter; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java index 8e0bf3ba..6f5324c5 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java @@ -25,6 +25,7 @@ import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.MetricUtil; import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer; +import org.apache.flink.connector.kafka.sink.internal.KafkaCommitter; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitter.java similarity index 95% rename from flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java rename to flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitter.java index 91e724e2..de6abbc7 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitter.java @@ -15,14 +15,13 @@ * limitations under the License. */ -package org.apache.flink.connector.kafka.sink; +package org.apache.flink.connector.kafka.sink.internal; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.sink2.Committer; -import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory; -import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer; -import org.apache.flink.connector.kafka.sink.internal.TransactionFinished; -import org.apache.flink.connector.kafka.sink.internal.WritableBackchannel; +import org.apache.flink.connector.kafka.sink.KafkaCommittable; +import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.IOUtils; @@ -48,7 +47,8 @@ import java.util.function.BiFunction; * * <p>The committer is responsible to finalize the Kafka transactions by committing them. */ -class KafkaCommitter implements Committer<KafkaCommittable>, Closeable { +@Internal +public class KafkaCommitter implements Committer<KafkaCommittable>, Closeable { private static final Logger LOG = LoggerFactory.getLogger(KafkaCommitter.class); public static final String UNKNOWN_PRODUCER_ID_ERROR_MESSAGE = diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitterTest.java similarity index 96% rename from flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java rename to flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitterTest.java index c87bfd50..1a969b53 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitterTest.java @@ -15,13 +15,10 @@ * limitations under the License. */ -package org.apache.flink.connector.kafka.sink; +package org.apache.flink.connector.kafka.sink.internal; import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest; -import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory; -import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer; -import org.apache.flink.connector.kafka.sink.internal.ReadableBackchannel; -import org.apache.flink.connector.kafka.sink.internal.TransactionFinished; +import org.apache.flink.connector.kafka.sink.KafkaCommittable; import org.apache.flink.util.TestLoggerExtension; import org.apache.kafka.clients.CommonClientConfigs; diff --git a/tools/releasing/shared b/tools/releasing/shared index c4115618..d15b90de 160000 --- a/tools/releasing/shared +++ b/tools/releasing/shared @@ -1 +1 @@ -Subproject commit c4115618085ac046033368e8e3a7eee59874608f +Subproject commit d15b90de64799764d814c0b1ab2ee5e5e3baa1db
