This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new dede06c  KAFKA-7941: Catch TimeoutException in KafkaBasedLog worker 
thread (#6283)
dede06c is described below

commit dede06c71ce5be88f74864981f874bf22c903145
Author: Paul <pgwha...@gmail.com>
AuthorDate: Tue Aug 13 10:16:55 2019 -0500

    KAFKA-7941: Catch TimeoutException in KafkaBasedLog worker thread (#6283)
    
    When calling readLogToEnd(), the KafkaBasedLog worker thread should catch 
TimeoutException and log a warning, which can occur if brokers are unavailable, 
otherwise the worker thread terminates.
    
    Includes an enhancement to MockConsumer that allows simulating exceptions 
not just when polling but also when querying for offsets, which is necessary 
for testing the fix.
    
    Author: Paul Whalen <pgwha...@gmail.com>
    Reviewers: Randall Hauch <rha...@gmail.com>, Arjun Satish 
<ar...@confluent.io>, Ryanne Dolan <ryannedo...@gmail.com>
---
 .../kafka/clients/consumer/MockConsumer.java       | 35 ++++++++--
 .../apache/kafka/connect/util/KafkaBasedLog.java   |  5 ++
 .../kafka/connect/util/KafkaBasedLogTest.java      | 76 +++++++++++++++++++++-
 .../internals/GlobalStateManagerImplTest.java      |  2 +-
 .../internals/GlobalStreamThreadTest.java          |  2 +-
 .../internals/StoreChangelogReaderTest.java        |  2 +-
 .../processor/internals/StreamThreadTest.java      |  2 +-
 7 files changed, 112 insertions(+), 12 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index ce6a60b..d936780 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -58,7 +58,8 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     private final Set<TopicPartition> paused;
 
     private Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
-    private KafkaException exception;
+    private KafkaException pollException;
+    private KafkaException offsetsException;
     private AtomicBoolean wakeup;
     private boolean closed;
 
@@ -71,7 +72,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         this.beginningOffsets = new HashMap<>();
         this.endOffsets = new HashMap<>();
         this.pollTasks = new LinkedList<>();
-        this.exception = null;
+        this.pollException = null;
         this.wakeup = new AtomicBoolean(false);
         this.committed = new HashMap<>();
     }
@@ -170,9 +171,9 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
             throw new WakeupException();
         }
 
-        if (exception != null) {
-            RuntimeException exception = this.exception;
-            this.exception = null;
+        if (pollException != null) {
+            RuntimeException exception = this.pollException;
+            this.pollException = null;
             throw exception;
         }
 
@@ -213,8 +214,20 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         recs.add(record);
     }
 
+    /**
+     * @deprecated Use {@link #setPollException(KafkaException)} instead
+     */
+    @Deprecated
     public synchronized void setException(KafkaException exception) {
-        this.exception = exception;
+        setPollException(exception);
+    }
+
+    public synchronized void setPollException(KafkaException exception) {
+        this.pollException = exception;
+    }
+
+    public synchronized void setOffsetsException(KafkaException exception) {
+        this.offsetsException = exception;
     }
 
     @Override
@@ -388,6 +401,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
 
     @Override
     public synchronized Map<TopicPartition, Long> 
beginningOffsets(Collection<TopicPartition> partitions) {
+        if (offsetsException != null) {
+            RuntimeException exception = this.offsetsException;
+            this.offsetsException = null;
+            throw exception;
+        }
         Map<TopicPartition, Long> result = new HashMap<>();
         for (TopicPartition tp : partitions) {
             Long beginningOffset = beginningOffsets.get(tp);
@@ -400,6 +418,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
 
     @Override
     public synchronized Map<TopicPartition, Long> 
endOffsets(Collection<TopicPartition> partitions) {
+        if (offsetsException != null) {
+            RuntimeException exception = this.offsetsException;
+            this.offsetsException = null;
+            throw exception;
+        }
         Map<TopicPartition, Long> result = new HashMap<>();
         for (TopicPartition tp : partitions) {
             Long endOffset = getEndOffset(endOffsets.get(tp));
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 9d77d21..e78276a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -312,6 +313,10 @@ public class KafkaBasedLog<K, V> {
                         try {
                             readToLogEnd();
                             log.trace("Finished read to end log for topic {}", 
topic);
+                        } catch (TimeoutException e) {
+                            log.warn("Timeout while reading log to end for 
topic '{}'. Retrying automatically. " +
+                                "This may occur when brokers are unavailable 
or unreachable. Reason: {}", topic, e.getMessage());
+                            continue;
                         } catch (WakeupException e) {
                             // Either received another get() call and need to 
retry reading to end of log or stop() was
                             // called. Both are handled by restarting this 
loop.
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index 1af6e34..080f943 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.TimestampType;
@@ -370,7 +371,7 @@ public class KafkaBasedLogTest {
     }
 
     @Test
-    public void testConsumerError() throws Exception {
+    public void testPollConsumerError() throws Exception {
         expectStart();
         expectStop();
 
@@ -388,7 +389,7 @@ public class KafkaBasedLogTest {
                 consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
-                        
consumer.setException(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+                        
consumer.setPollException(Errors.COORDINATOR_NOT_AVAILABLE.exception());
                     }
                 });
 
@@ -424,6 +425,77 @@ public class KafkaBasedLogTest {
     }
 
     @Test
+    public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception {
+        expectStart();
+
+        // Producer flushes when read to log end is called
+        producer.flush();
+        PowerMock.expectLastCall();
+
+        expectStop();
+
+        PowerMock.replayAll();
+        final CountDownLatch finishedLatch = new CountDownLatch(1);
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        consumer.updateEndOffsets(endOffsets);
+        store.start();
+        final AtomicBoolean getInvoked = new AtomicBoolean(false);
+        final FutureCallback<Void> readEndFutureCallback = new 
FutureCallback<>(new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                getInvoked.set(true);
+            }
+        });
+        consumer.schedulePollTask(new Runnable() {
+            @Override
+            public void run() {
+                // Once we're synchronized in a poll, start the read to end 
and schedule the exact set of poll events
+                // that should follow. This readToEnd call will immediately 
wakeup this consumer.poll() call without
+                // returning any data.
+                Map<TopicPartition, Long> newEndOffsets = new HashMap<>();
+                newEndOffsets.put(TP0, 1L);
+                newEndOffsets.put(TP1, 1L);
+                consumer.updateEndOffsets(newEndOffsets);
+                // Set exception to occur when getting offsets to read log to 
end.  It'll be caught in the work thread,
+                // which will retry and eventually get the correct offsets and 
read log to end.
+                consumer.setOffsetsException(new TimeoutException("Failed to 
get offsets by times"));
+                store.readToEnd(readEndFutureCallback);
+
+                // Should keep polling until it reaches current log end offset 
for all partitions
+                consumer.scheduleNopPollTask();
+                consumer.scheduleNopPollTask();
+                consumer.schedulePollTask(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 
0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 
0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW));
+                    }
+                });
+
+                consumer.schedulePollTask(new Runnable() {
+                    @Override
+                    public void run() {
+                        finishedLatch.countDown();
+                    }
+                });
+            }
+        });
+        readEndFutureCallback.get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(getInvoked.get());
+        assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS));
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+        assertEquals(1L, consumer.position(TP0));
+
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testProducerError() throws Exception {
         expectStart();
         TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 6e4f0d5..b9c16cf 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -302,7 +302,7 @@ public class GlobalStateManagerImplTest {
     @Test
     public void shouldRecoverFromInvalidOffsetExceptionAndRestoreRecords() {
         initializeConsumer(2, 0, t1);
-        consumer.setException(new InvalidOffsetException("Try Again!") {
+        consumer.setPollException(new InvalidOffsetException("Try Again!") {
             public Set<TopicPartition> partitions() {
                 return Collections.singleton(t1);
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index c0e0de3..b67b664 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -236,7 +236,7 @@ public class GlobalStreamThreadTest {
             10 * 1000,
             "Input record never consumed");
 
-        mockConsumer.setException(new InvalidOffsetException("Try Again!") {
+        mockConsumer.setPollException(new InvalidOffsetException("Try Again!") 
{
             @Override
             public Set<TopicPartition> partitions() {
                 return Collections.singleton(topicPartition);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index b13ed53..bf59c21 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -158,7 +158,7 @@ public class StoreChangelogReaderTest {
     public void shouldRecoverFromInvalidOffsetExceptionAndFinishRestore() {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
-        consumer.setException(new InvalidOffsetException("Try Again!") {
+        consumer.setPollException(new InvalidOffsetException("Try Again!") {
             @Override
             public Set<TopicPartition> partitions() {
                 return Collections.singleton(topicPartition);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 78d5772..d528bc6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -1377,7 +1377,7 @@ public class StreamThreadTest {
                 () -> mockRestoreConsumer.position(changelogPartition) == 1L,
                 "Never restore first record");
 
-            mockRestoreConsumer.setException(new InvalidOffsetException("Try 
Again!") {
+            mockRestoreConsumer.setPollException(new 
InvalidOffsetException("Try Again!") {
                 @Override
                 public Set<TopicPartition> partitions() {
                     return changelogPartitionSet;

Reply via email to