This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit a18d7afa17027e40dd35cf1586edd9389ee6cecc
Author: Arvid Heise <[email protected]>
AuthorDate: Tue May 20 09:46:06 2025 +0200

    [FLINK-37818] Move KafkaCommitter to internal
    
    There is no leaking of the specific class in any of the KafkaSink(Builder) 
signatures.
---
 .../archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612 |  4 ++--
 .../apache/flink/connector/kafka/sink/KafkaCommittable.java  |  3 ++-
 .../org/apache/flink/connector/kafka/sink/KafkaSink.java     |  1 +
 .../org/apache/flink/connector/kafka/sink/KafkaWriter.java   |  1 +
 .../connector/kafka/sink/{ => internal}/KafkaCommitter.java  | 12 ++++++------
 .../kafka/sink/{ => internal}/KafkaCommitterTest.java        |  7 ++-----
 tools/releasing/shared                                       |  2 +-
 7 files changed, 15 insertions(+), 15 deletions(-)

diff --git 
a/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612
 
b/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612
index b96d3c6b..a4256dac 100644
--- 
a/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612
+++ 
b/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612
@@ -23,8 +23,8 @@ Method 
<org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourc
 Method 
<org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.syncAvailabilityHelperWithReaders()>
 calls method 
<org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.anyOf(int,
 java.util.concurrent.CompletableFuture)> in (DynamicKafkaSourceReader.java:500)
 Method 
<org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getProducerPool()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(ExactlyOnceKafkaWriter.java:0)
 Method 
<org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getTransactionalIdPrefix()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(ExactlyOnceKafkaWriter.java:0)
-Method <org.apache.flink.connector.kafka.sink.KafkaCommitter.getBackchannel()> 
is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(KafkaCommitter.java:0)
-Method 
<org.apache.flink.connector.kafka.sink.KafkaCommitter.getCommittingProducer()> 
is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(KafkaCommitter.java:0)
+Method 
<org.apache.flink.connector.kafka.sink.internal.KafkaCommitter.getBackchannel()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(KafkaCommitter.java:0)
+Method 
<org.apache.flink.connector.kafka.sink.internal.KafkaCommitter.getCommittingProducer()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(KafkaCommitter.java:0)
 Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> 
in (KafkaSink.java:178)
 Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.getInputs()> in 
(KafkaSink.java:181)
 Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in 
(KafkaSink.java:177)
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java
index 1ce3b6bb..168bf2fd 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java
@@ -19,6 +19,7 @@ package org.apache.flink.connector.kafka.sink;
 
 import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
+import org.apache.flink.connector.kafka.sink.internal.KafkaCommitter;
 
 import javax.annotation.Nullable;
 
@@ -30,7 +31,7 @@ import java.util.Optional;
  * to commit transactions in {@link KafkaCommitter}.
  */
 @Internal
-class KafkaCommittable {
+public class KafkaCommittable {
 
     private final long producerId;
     private final short epoch;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
index 77ddc87d..be631bb0 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
@@ -31,6 +31,7 @@ import org.apache.flink.connector.kafka.lineage.LineageUtil;
 import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
 import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
 import 
org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
+import org.apache.flink.connector.kafka.sink.internal.KafkaCommitter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index 8e0bf3ba..6f5324c5 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.kafka.MetricUtil;
 import 
org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
+import org.apache.flink.connector.kafka.sink.internal.KafkaCommitter;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitter.java
similarity index 95%
rename from 
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
rename to 
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitter.java
index 91e724e2..de6abbc7 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitter.java
@@ -15,14 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.kafka.sink;
+package org.apache.flink.connector.kafka.sink.internal;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory;
-import 
org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
-import org.apache.flink.connector.kafka.sink.internal.TransactionFinished;
-import org.apache.flink.connector.kafka.sink.internal.WritableBackchannel;
+import org.apache.flink.connector.kafka.sink.KafkaCommittable;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.IOUtils;
 
@@ -48,7 +47,8 @@ import java.util.function.BiFunction;
  *
  * <p>The committer is responsible to finalize the Kafka transactions by 
committing them.
  */
-class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {
+@Internal
+public class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaCommitter.class);
     public static final String UNKNOWN_PRODUCER_ID_ERROR_MESSAGE =
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitterTest.java
similarity index 96%
rename from 
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
rename to 
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitterTest.java
index c87bfd50..1a969b53 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitterTest.java
@@ -15,13 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.kafka.sink;
+package org.apache.flink.connector.kafka.sink.internal;
 
 import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest;
-import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory;
-import 
org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
-import org.apache.flink.connector.kafka.sink.internal.ReadableBackchannel;
-import org.apache.flink.connector.kafka.sink.internal.TransactionFinished;
+import org.apache.flink.connector.kafka.sink.KafkaCommittable;
 import org.apache.flink.util.TestLoggerExtension;
 
 import org.apache.kafka.clients.CommonClientConfigs;
diff --git a/tools/releasing/shared b/tools/releasing/shared
index c4115618..d15b90de 160000
--- a/tools/releasing/shared
+++ b/tools/releasing/shared
@@ -1 +1 @@
-Subproject commit c4115618085ac046033368e8e3a7eee59874608f
+Subproject commit d15b90de64799764d814c0b1ab2ee5e5e3baa1db

Reply via email to