This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 11d40cc5 [FLINK-38790] Update CI to Flink 2.2, adjust code
11d40cc5 is described below

commit 11d40cc50fdc64625c54443226ce7434728e91ac
Author: Ferenc Csaky <[email protected]>
AuthorDate: Tue May 12 09:07:24 2026 +0200

    [FLINK-38790] Update CI to Flink 2.2, adjust code
---
 .github/workflows/push_pr.yml                      |   2 +-
 .github/workflows/weekly.yml                       |   2 +-
 .../kafka/sink/ExactlyOnceKafkaWriter.java         |   1 +
 .../flink/connector/kafka/sink/KafkaWriter.java    |   6 +-
 .../kafka/sink/ExactlyOnceKafkaWriterTest.java     | 125 +++++++++++++++++++++
 5 files changed, 133 insertions(+), 3 deletions(-)

diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index 68724cff..beec1051 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -28,7 +28,7 @@ jobs:
   compile_and_test:
     strategy:
       matrix:
-        flink: &flink_versions [ 2.0.1, 2.1.1 ]
+        flink: &flink_versions [ 2.2.0, 2.1.1 ]
         jdk: &jdk_versions [ '11, 17, 21' ]
     uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
     with:
diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml
index eacba791..54edb776 100644
--- a/.github/workflows/weekly.yml
+++ b/.github/workflows/weekly.yml
@@ -33,7 +33,7 @@ jobs:
           flink: 2.2-SNAPSHOT,
           branch: main
         }, {
-          flink: 2.1.1,
+          flink: 2.1-SNAPSHOT,
           branch: main
         }, {
           flink: 2.0.1,
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
index 64eb1a35..6161da0d 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
@@ -268,6 +268,7 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
 
     @Override
     public void close() throws Exception {
+        markClosed();
         closeAll(
                 this::abortCurrentProducer,
                 () -> closeAll(producerPool),
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index c2692766..36bd6697 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -199,7 +199,7 @@ class KafkaWriter<IN>
 
     @Override
     public void close() throws Exception {
-        closed = true;
+        markClosed();
         LOG.debug("Closing writer with {}", currentProducer);
         closeAll(currentProducer);
         checkState(
@@ -210,6 +210,10 @@ class KafkaWriter<IN>
         checkAsyncException();
     }
 
+    protected void markClosed() {
+        closed = true;
+    }
+
     @VisibleForTesting
     FlinkKafkaInternalProducer<byte[], byte[]> getCurrentProducer() {
         return currentProducer;
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterTest.java
new file mode 100644
index 00000000..e9f0dc5c
--- /dev/null
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
+import 
org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyImpl;
+import 
org.apache.flink.connector.kafka.sink.internal.TransactionNamingStrategyImpl;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.metrics.testutils.MetricListener;
+import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.TransactionAbortedException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode;
+
+/** Tests for {@link ExactlyOnceKafkaWriter}. */
+@ExtendWith(TestLoggerExtension.class)
+class ExactlyOnceKafkaWriterTest {
+
+    @Test
+    void testCloseIgnoresAbortTriggeredAsyncError() {
+        final SinkWriterMetricGroup metricGroup = 
createSinkWriterMetricGroup();
+        final Counter numRecordsOutErrors = 
metricGroup.getNumRecordsOutErrorsCounter();
+        final ExactlyOnceKafkaWriter<Integer> writer = 
createWriter(metricGroup);
+        writer.currentProducer =
+                new MockProducer(
+                        writer.deliveryCallback,
+                        new TransactionAbortedException("Transaction aborted 
during close"));
+
+        assertThatCode(writer::close).doesNotThrowAnyException();
+        assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
+    }
+
+    @Test
+    void testClosePropagatesAsyncErrorReportedBeforeClose() {
+        final ExactlyOnceKafkaWriter<Integer> writer = 
createWriter(createSinkWriterMetricGroup());
+        writer.currentProducer = new MockProducer(writer.deliveryCallback, 
null);
+        writer.deliveryCallback.onCompletion(
+                null, new ProducerFencedException("Producer fenced before 
close"));
+
+        
assertThatCode(writer::close).hasRootCauseExactlyInstanceOf(ProducerFencedException.class);
+    }
+
+    private static ExactlyOnceKafkaWriter<Integer> 
createWriter(SinkWriterMetricGroup metricGroup) {
+        return new ExactlyOnceKafkaWriter<>(
+                DeliveryGuarantee.EXACTLY_ONCE,
+                getKafkaClientConfiguration(),
+                "test-prefix",
+                new KafkaWriterTestBase.SinkInitContext(
+                        metricGroup, new 
KafkaWriterTestBase.TriggerTimeService(), null),
+                (element, context, timestamp) -> new ProducerRecord<>("topic", 
new byte[0]),
+                null,
+                TransactionAbortStrategyImpl.PROBING,
+                TransactionNamingStrategyImpl.INCREMENTING,
+                List.of());
+    }
+
+    private static SinkWriterMetricGroup createSinkWriterMetricGroup() {
+        return InternalSinkWriterMetricGroup.wrap(
+                new KafkaWriterTestBase.DummyOperatorMetricGroup(
+                        new MetricListener().getMetricGroup()));
+    }
+
+    private static Properties getKafkaClientConfiguration() {
+        Properties properties = new Properties();
+        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:1234");
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
+
+        return properties;
+    }
+
+    private static class MockProducer extends 
FlinkKafkaInternalProducer<byte[], byte[]> {
+
+        private final Callback callback;
+        @Nullable private final RuntimeException abortException;
+
+        private MockProducer(Callback callback, @Nullable RuntimeException 
abortException) {
+            super(getKafkaClientConfiguration());
+            this.callback = callback;
+            this.abortException = abortException;
+        }
+
+        @Override
+        public boolean hasRecordsInTransaction() {
+            return abortException != null;
+        }
+
+        @Override
+        public void abortTransaction() {
+            callback.onCompletion(null, abortException);
+        }
+    }
+}

Reply via email to