This is an automated email from the ASF dual-hosted git repository.
gaborgsomogyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 11d40cc5 [FLINK-38790] Update CI to Flink 2.2, adjust code
11d40cc5 is described below
commit 11d40cc50fdc64625c54443226ce7434728e91ac
Author: Ferenc Csaky <[email protected]>
AuthorDate: Tue May 12 09:07:24 2026 +0200
[FLINK-38790] Update CI to Flink 2.2, adjust code
---
.github/workflows/push_pr.yml | 2 +-
.github/workflows/weekly.yml | 2 +-
.../kafka/sink/ExactlyOnceKafkaWriter.java | 1 +
.../flink/connector/kafka/sink/KafkaWriter.java | 6 +-
.../kafka/sink/ExactlyOnceKafkaWriterTest.java | 125 +++++++++++++++++++++
5 files changed, 133 insertions(+), 3 deletions(-)
diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index 68724cff..beec1051 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -28,7 +28,7 @@ jobs:
compile_and_test:
strategy:
matrix:
- flink: &flink_versions [ 2.0.1, 2.1.1 ]
+ flink: &flink_versions [ 2.2.0, 2.1.1 ]
jdk: &jdk_versions [ '11, 17, 21' ]
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml
index eacba791..54edb776 100644
--- a/.github/workflows/weekly.yml
+++ b/.github/workflows/weekly.yml
@@ -33,7 +33,7 @@ jobs:
flink: 2.2-SNAPSHOT,
branch: main
}, {
- flink: 2.1.1,
+ flink: 2.1-SNAPSHOT,
branch: main
}, {
flink: 2.0.1,
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
index 64eb1a35..6161da0d 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
@@ -268,6 +268,7 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
@Override
public void close() throws Exception {
+ markClosed();
closeAll(
this::abortCurrentProducer,
() -> closeAll(producerPool),
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index c2692766..36bd6697 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -199,7 +199,7 @@ class KafkaWriter<IN>
@Override
public void close() throws Exception {
- closed = true;
+ markClosed();
LOG.debug("Closing writer with {}", currentProducer);
closeAll(currentProducer);
checkState(
@@ -210,6 +210,10 @@ class KafkaWriter<IN>
checkAsyncException();
}
+ protected void markClosed() {
+ closed = true;
+ }
+
@VisibleForTesting
FlinkKafkaInternalProducer<byte[], byte[]> getCurrentProducer() {
return currentProducer;
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterTest.java
new file mode 100644
index 00000000..e9f0dc5c
--- /dev/null
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import
org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
+import
org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyImpl;
+import
org.apache.flink.connector.kafka.sink.internal.TransactionNamingStrategyImpl;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.metrics.testutils.MetricListener;
+import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.TransactionAbortedException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode;
+
+/** Tests for {@link ExactlyOnceKafkaWriter}. */
+@ExtendWith(TestLoggerExtension.class)
+class ExactlyOnceKafkaWriterTest {
+
+ @Test
+ void testCloseIgnoresAbortTriggeredAsyncError() {
+ final SinkWriterMetricGroup metricGroup =
createSinkWriterMetricGroup();
+ final Counter numRecordsOutErrors =
metricGroup.getNumRecordsOutErrorsCounter();
+ final ExactlyOnceKafkaWriter<Integer> writer =
createWriter(metricGroup);
+ writer.currentProducer =
+ new MockProducer(
+ writer.deliveryCallback,
+ new TransactionAbortedException("Transaction aborted
during close"));
+
+ assertThatCode(writer::close).doesNotThrowAnyException();
+ assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
+ }
+
+ @Test
+ void testClosePropagatesAsyncErrorReportedBeforeClose() {
+ final ExactlyOnceKafkaWriter<Integer> writer =
createWriter(createSinkWriterMetricGroup());
+ writer.currentProducer = new MockProducer(writer.deliveryCallback,
null);
+ writer.deliveryCallback.onCompletion(
+ null, new ProducerFencedException("Producer fenced before
close"));
+
+
assertThatCode(writer::close).hasRootCauseExactlyInstanceOf(ProducerFencedException.class);
+ }
+
+ private static ExactlyOnceKafkaWriter<Integer>
createWriter(SinkWriterMetricGroup metricGroup) {
+ return new ExactlyOnceKafkaWriter<>(
+ DeliveryGuarantee.EXACTLY_ONCE,
+ getKafkaClientConfiguration(),
+ "test-prefix",
+ new KafkaWriterTestBase.SinkInitContext(
+ metricGroup, new
KafkaWriterTestBase.TriggerTimeService(), null),
+ (element, context, timestamp) -> new ProducerRecord<>("topic",
new byte[0]),
+ null,
+ TransactionAbortStrategyImpl.PROBING,
+ TransactionNamingStrategyImpl.INCREMENTING,
+ List.of());
+ }
+
+ private static SinkWriterMetricGroup createSinkWriterMetricGroup() {
+ return InternalSinkWriterMetricGroup.wrap(
+ new KafkaWriterTestBase.DummyOperatorMetricGroup(
+ new MetricListener().getMetricGroup()));
+ }
+
+ private static Properties getKafkaClientConfiguration() {
+ Properties properties = new Properties();
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:1234");
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
+
+ return properties;
+ }
+
+ private static class MockProducer extends
FlinkKafkaInternalProducer<byte[], byte[]> {
+
+ private final Callback callback;
+ @Nullable private final RuntimeException abortException;
+
+ private MockProducer(Callback callback, @Nullable RuntimeException
abortException) {
+ super(getKafkaClientConfiguration());
+ this.callback = callback;
+ this.abortException = abortException;
+ }
+
+ @Override
+ public boolean hasRecordsInTransaction() {
+ return abortException != null;
+ }
+
+ @Override
+ public void abortTransaction() {
+ callback.onCompletion(null, abortException);
+ }
+ }
+}