Repository: flink
Updated Branches:
  refs/heads/release-1.1 caa0fbb21 -> 90d77594f


[FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and don't 
block on polls

Letting the Kafka commit block on polls means that 'notifyCheckpointComplete()' 
may take
very long. This is mostly relevant for low-throughput Kafka topics.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90d77594
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90d77594
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90d77594

Branch: refs/heads/release-1.1
Commit: 90d77594fffda1d8d15658d363c478ea6430514e
Parents: caa0fbb
Author: Stephan Ewen <[email protected]>
Authored: Thu Sep 29 18:09:51 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Fri Sep 30 12:39:53 2016 +0200

----------------------------------------------------------------------
 .../kafka/internal/Kafka09Fetcher.java          |  73 +++--
 .../connectors/kafka/Kafka09FetcherTest.java    | 304 +++++++++++++++++++
 2 files changed, 355 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/90d77594/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 9c861c9..1da2259 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -37,6 +37,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
@@ -50,6 +51,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer 
API.
@@ -74,18 +76,24 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
        /** The maximum number of milliseconds to wait for a fetch batch */
        private final long pollTimeout;
 
-       /** Mutex to guard against concurrent access to the non-threadsafe 
Kafka consumer */
-       private final Object consumerLock = new Object();
+       /** The next offsets that the main thread should commit */
+       private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> 
nextOffsetsToCommit;
+       
+       /** The callback invoked by Kafka once an offset commit is complete */
+       private final OffsetCommitCallback offsetCommitCallback;
 
        /** Reference to the Kafka consumer, once it is created */
        private volatile KafkaConsumer<byte[], byte[]> consumer;
-
+       
        /** Reference to the proxy, forwarding exceptions from the fetch thread 
to the main thread */
        private volatile ExceptionProxy errorHandler;
 
        /** Flag to mark the main work loop as alive */
        private volatile boolean running = true;
 
+       /** Flag tracking whether the latest commit request has completed */
+       private volatile boolean commitInProgress;
+
        // 
------------------------------------------------------------------------
 
        public Kafka09Fetcher(
@@ -105,6 +113,8 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
                this.runtimeContext = runtimeContext;
                this.kafkaProperties = kafkaProperties;
                this.pollTimeout = pollTimeout;
+               this.nextOffsetsToCommit = new AtomicReference<>();
+               this.offsetCommitCallback = new CommitCallback();
 
                // if checkpointing is enabled, we are not automatically 
committing to Kafka.
                
kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
@@ -203,19 +213,23 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
 
                        // main fetch loop
                        while (running) {
+
+                               // check if there is something to commit
+                               final Map<TopicPartition, OffsetAndMetadata> 
toCommit = nextOffsetsToCommit.getAndSet(null);
+                               if (toCommit != null && !commitInProgress) {
+                                       // reset the work-to-be committed, so 
we don't repeatedly commit the same
+                                       // also record that a commit is already 
in progress
+                                       commitInProgress = true;
+                                       consumer.commitAsync(toCommit, 
offsetCommitCallback);
+                               }
+
                                // get the next batch of records
                                final ConsumerRecords<byte[], byte[]> records;
-                               synchronized (consumerLock) {
-                                       try {
-                                               records = 
consumer.poll(pollTimeout);
-                                       }
-                                       catch (WakeupException we) {
-                                               if (running) {
-                                                       throw we;
-                                               } else {
-                                                       continue;
-                                               }
-                                       }
+                               try {
+                                       records = consumer.poll(pollTimeout);
+                               }
+                               catch (WakeupException we) {
+                                       continue;
                                }
 
                                // get the records for each topic partition
@@ -252,10 +266,9 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
                }
                finally {
                        try {
-                               synchronized (consumerLock) {
-                                       consumer.close();
-                               }
-                       } catch (Throwable t) {
+                               consumer.close();
+                       }
+                       catch (Throwable t) {
                                LOG.warn("Error while closing Kafka 0.9 
consumer", t);
                        }
                }
@@ -283,10 +296,14 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
                        }
                }
 
-               if (this.consumer != null) {
-                       synchronized (consumerLock) {
-                               this.consumer.commitSync(offsetsToCommit);
-                       }
+               // record the work to be committed by the main consumer thread 
and make sure the consumer notices that
+               if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
+                       LOG.warn("Committing offsets to Kafka takes longer than 
the checkpoint interval. " +
+                                       "Skipping commit of previous offsets 
because newer complete checkpoint offsets are available. " +
+                                       "This does not compromise Flink's 
checkpoint integrity.");
+               }
+               if (consumer != null) {
+                       consumer.wakeup();
                }
        }
 
@@ -301,4 +318,16 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
                }
                return result;
        }
+
+       private class CommitCallback implements OffsetCommitCallback {
+
+               @Override
+               public void onComplete(Map<TopicPartition, OffsetAndMetadata> 
offsets, Exception ex) {
+                       commitInProgress = false;
+
+                       if (ex != null) {
+                               LOG.warn("Committing offsets to Kafka failed. 
This does not compromise Flink's checkpoints.", ex);
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/90d77594/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
new file mode 100644
index 0000000..4fd6c9f
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the {@link Kafka09Fetcher}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(Kafka09Fetcher.class)
+public class Kafka09FetcherTest {
+
+       @Test
+       public void testCommitDoesNotBlock() throws Exception {
+
+               // test data
+               final KafkaTopicPartition testPartition = new 
KafkaTopicPartition("test", 42);
+               final Map<KafkaTopicPartition, Long> testCommitData = new 
HashMap<>();
+               testCommitData.put(testPartition, 11L);
+
+               // to synchronize when the consumer is in its blocking method
+               final OneShotLatch sync = new OneShotLatch();
+
+               // ----- the mock consumer with blocking poll calls ----
+               final MultiShotLatch blockerLatch = new MultiShotLatch();
+               
+               KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+               when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
+                       
+                       @Override
+                       public ConsumerRecords<?, ?> answer(InvocationOnMock 
invocation) throws InterruptedException {
+                               sync.trigger();
+                               blockerLatch.await();
+                               return ConsumerRecords.empty();
+                       }
+               });
+
+               doAnswer(new Answer<Void>() {
+                       @Override
+                       public Void answer(InvocationOnMock invocation) {
+                               blockerLatch.trigger();
+                               return null;
+                       }
+               }).when(mockConsumer).wakeup();
+
+               // make sure the fetcher creates the mock consumer
+               
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+               // ----- create the test fetcher -----
+
+               @SuppressWarnings("unchecked")
+               SourceContext<String> sourceContext = mock(SourceContext.class);
+               List<KafkaTopicPartition> topics = 
Collections.singletonList(new KafkaTopicPartition("test", 42));
+               KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+               StreamingRuntimeContext context = 
mock(StreamingRuntimeContext.class);
+               
+               final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+                               sourceContext, topics, null, null, context, 
schema, new Properties(), 0L, false);
+
+               // ----- run the fetcher -----
+
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+               final Thread fetcherRunner = new Thread("fetcher runner") {
+
+                       @Override
+                       public void run() {
+                               try {
+                                       fetcher.runFetchLoop();
+                               } catch (Throwable t) {
+                                       error.set(t);
+                               }
+                       }
+               };
+               fetcherRunner.start();
+
+               // wait until the fetcher has reached the method of interest
+               sync.await();
+
+               // ----- trigger the offset commit -----
+               
+               final AtomicReference<Throwable> commitError = new 
AtomicReference<>();
+               final Thread committer = new Thread("committer runner") {
+                       @Override
+                       public void run() {
+                               try {
+                                       
fetcher.commitSpecificOffsetsToKafka(testCommitData);
+                               } catch (Throwable t) {
+                                       commitError.set(t);
+                               }
+                       }
+               };
+               committer.start();
+
+               // ----- ensure that the committer finishes in time  -----
+               committer.join(30000);
+               assertFalse("The committer did not finish in time", 
committer.isAlive());
+
+               // ----- test done, wait till the fetcher is done for a clean 
shutdown -----
+               fetcher.cancel();
+               fetcherRunner.join();
+
+               // check that there were no errors in the fetcher
+               final Throwable fetcherError = error.get();
+               if (fetcherError != null) {
+                       throw new Exception("Exception in the fetcher", 
fetcherError);
+               }
+               final Throwable committerError = commitError.get();
+               if (committerError != null) {
+                       throw new Exception("Exception in the committer", 
committerError);
+               }
+       }
+
+       @Test
+       public void ensureOffsetsGetCommitted() throws Exception {
+               
+               // test data
+               final KafkaTopicPartition testPartition1 = new 
KafkaTopicPartition("test", 42);
+               final KafkaTopicPartition testPartition2 = new 
KafkaTopicPartition("another", 99);
+               
+               final Map<KafkaTopicPartition, Long> testCommitData1 = new 
HashMap<>();
+               testCommitData1.put(testPartition1, 11L);
+               testCommitData1.put(testPartition2, 18L);
+
+               final Map<KafkaTopicPartition, Long> testCommitData2 = new 
HashMap<>();
+               testCommitData2.put(testPartition1, 19L);
+               testCommitData2.put(testPartition2, 28L);
+
+               final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> 
commitStore = new LinkedBlockingQueue<>();
+
+
+               // ----- the mock consumer with poll(), wakeup(), and 
commit(A)sync calls ----
+
+               final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+               KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+
+               when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
+                       @Override
+                       public ConsumerRecords<?, ?> answer(InvocationOnMock 
invocation) throws InterruptedException {
+                               blockerLatch.await();
+                               return ConsumerRecords.empty();
+                       }
+               });
+
+               doAnswer(new Answer<Void>() {
+                       @Override
+                       public Void answer(InvocationOnMock invocation) {
+                               blockerLatch.trigger();
+                               return null;
+                       }
+               }).when(mockConsumer).wakeup();
+
+               doAnswer(new Answer<Void>() {
+                       @Override
+                       public Void answer(InvocationOnMock invocation) {
+                               @SuppressWarnings("unchecked")
+                               Map<TopicPartition, OffsetAndMetadata> offsets 
= 
+                                               (Map<TopicPartition, 
OffsetAndMetadata>) invocation.getArguments()[0];
+
+                               OffsetCommitCallback callback = 
(OffsetCommitCallback) invocation.getArguments()[1];
+
+                               commitStore.add(offsets);
+                               callback.onComplete(offsets, null);
+
+                               return null; 
+                       }
+               }).when(mockConsumer).commitAsync(
+                               Mockito.<Map<TopicPartition, 
OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
+
+               // make sure the fetcher creates the mock consumer
+               
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+               // ----- create the test fetcher -----
+
+               @SuppressWarnings("unchecked")
+               SourceContext<String> sourceContext = mock(SourceContext.class);
+               List<KafkaTopicPartition> topics = 
Collections.singletonList(new KafkaTopicPartition("test", 42));
+               KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+               StreamingRuntimeContext context = 
mock(StreamingRuntimeContext.class);
+
+               final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+                               sourceContext, topics, null, null, context, 
schema, new Properties(), 0L, false);
+
+               // ----- run the fetcher -----
+
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+               final Thread fetcherRunner = new Thread("fetcher runner") {
+
+                       @Override
+                       public void run() {
+                               try {
+                                       fetcher.runFetchLoop();
+                               } catch (Throwable t) {
+                                       error.set(t);
+                               }
+                       }
+               };
+               fetcherRunner.start();
+
+               // ----- trigger the first offset commit -----
+
+               fetcher.commitSpecificOffsetsToKafka(testCommitData1);
+               Map<TopicPartition, OffsetAndMetadata> result1 = 
commitStore.take();
+
+               for (Entry<TopicPartition, OffsetAndMetadata> entry : 
result1.entrySet()) {
+                       TopicPartition partition = entry.getKey();
+                       if (partition.topic().equals("test")) {
+                               assertEquals(42, partition.partition());
+                               assertEquals(11L, entry.getValue().offset());
+                       }
+                       else if (partition.topic().equals("another")) {
+                               assertEquals(99, partition.partition());
+                               assertEquals(18L, entry.getValue().offset());
+                       }
+               }
+
+               // ----- trigger the second offset commit -----
+
+               fetcher.commitSpecificOffsetsToKafka(testCommitData2);
+               Map<TopicPartition, OffsetAndMetadata> result2 = 
commitStore.take();
+
+               for (Entry<TopicPartition, OffsetAndMetadata> entry : 
result2.entrySet()) {
+                       TopicPartition partition = entry.getKey();
+                       if (partition.topic().equals("test")) {
+                               assertEquals(42, partition.partition());
+                               assertEquals(19L, entry.getValue().offset());
+                       }
+                       else if (partition.topic().equals("another")) {
+                               assertEquals(99, partition.partition());
+                               assertEquals(28L, entry.getValue().offset());
+                       }
+               }
+               
+               // ----- test done, wait till the fetcher is done for a clean 
shutdown -----
+               fetcher.cancel();
+               fetcherRunner.join();
+
+               // check that there were no errors in the fetcher
+               final Throwable caughtError = error.get();
+               if (caughtError != null) {
+                       throw new Exception("Exception in the fetcher", 
caughtError);
+               }
+       }
+}

Reply via email to