This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 27a54655 [FLINK-28758] Fix stop-with-savepoint for FlinkKafkaConsumer
27a54655 is described below
commit 27a546551b677330bf3cc38a02b569097619d8a5
Author: Piotr Nowojski <[email protected]>
AuthorDate: Wed Sep 13 13:48:01 2023 +0200
[FLINK-28758] Fix stop-with-savepoint for FlinkKafkaConsumer
---
.../connectors/kafka/internals/KafkaFetcher.java | 7 ++
.../connectors/kafka/FlinkKafkaConsumerITCase.java | 129 +++++++++++++++++++++
2 files changed, 136 insertions(+)
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java
index bee23397..9c4d8387 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java
@@ -24,6 +24,7 @@ import
org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -142,6 +143,12 @@ public class KafkaFetcher<T> extends AbstractFetcher<T,
TopicPartition> {
partitionConsumerRecordsHandler(partitionRecords,
partition);
}
}
+ } catch (Handover.ClosedException ex) {
+ if (running) {
+ // rethrow, only if we are running, if fetcher is not running
we should not throw
+ // the ClosedException, as we are stopping gracefully
+ ExceptionUtils.rethrowException(ex);
+ }
} finally {
// this signals the consumer thread that no more work is to be done
consumerThread.shutdown();
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerITCase.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerITCase.java
new file mode 100644
index 00000000..90c77373
--- /dev/null
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerITCase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.MapFunction;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.junit.ClassRule;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+/** ITCase tests class for {@link FlinkKafkaConsumer}. */
+@TestInstance(Lifecycle.PER_CLASS)
+public class FlinkKafkaConsumerITCase {
+ private static final String TOPIC1 = "FlinkKafkaConsumerITCase_topic1";
+
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(new Configuration())
+ .build());
+
+ @BeforeAll
+ public void setup() throws Throwable {
+ KafkaSourceTestEnv.setup();
+ KafkaSourceTestEnv.setupTopic(
+ TOPIC1, true, true,
KafkaSourceTestEnv::getRecordsForTopicWithoutTimestamp);
+ }
+
+ @AfterAll
+ public void tearDown() throws Exception {
+ KafkaSourceTestEnv.tearDown();
+ }
+
+ @Test
+ public void testStopWithSavepoint(@TempDir Path savepointsDir) throws
Exception {
+ Configuration config =
+ new Configuration()
+ .set(
+ CheckpointingOptions.SAVEPOINT_DIRECTORY,
+ savepointsDir.toUri().toString());
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.setParallelism(1);
+
+ Properties properties = new Properties();
+ properties.setProperty(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ KafkaSourceTestEnv.brokerConnectionStrings);
+ properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"testStopWithSavepoint");
+ properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+
+ FlinkKafkaConsumer<Integer> kafkaConsumer =
+ new FlinkKafkaConsumer<>(
+ TOPIC1,
+ new TypeInformationSerializationSchema<>(
+ BasicTypeInfo.INT_TYPE_INFO, new
ExecutionConfig()),
+ properties);
+ DataStreamSource<Integer> stream = env.addSource(kafkaConsumer);
+
+ ProgressLatchingIdentityFunction.resetBeforeUse();
+ stream.map(new ProgressLatchingIdentityFunction()).addSink(new
DiscardingSink<>());
+
+ JobClient jobClient = env.executeAsync();
+
+ ProgressLatchingIdentityFunction.getProgressLatch().await();
+
+ // Check that stopWithSavepoint completes successfully
+ jobClient.stopWithSavepoint(false, null,
SavepointFormatType.CANONICAL).get();
+ // TODO: ideally we should test recovery, that there were no data
losses etc, but this
+ // is already a deprecated class, so I'm not adding new tests for that
now.
+ }
+
+ private static class ProgressLatchingIdentityFunction implements
MapFunction<Integer, Integer> {
+
+ static CountDownLatch progressLatch;
+
+ static void resetBeforeUse() {
+ progressLatch = new CountDownLatch(1);
+ }
+
+ public static CountDownLatch getProgressLatch() {
+ return progressLatch;
+ }
+
+ @Override
+ public Integer map(Integer integer) throws Exception {
+ progressLatch.countDown();
+ return integer;
+ }
+ }
+}