Repository: kafka
Updated Branches:
  refs/heads/trunk 0785feeb0 -> 1d4a0b881


KAFKA-2667: Fix transient error in KafkaBasedLogTest.

The test required a specific sequence of events for each Consumer.poll() call,
but the MockConsumer.waitForPollThen() method could not guarantee that,
resulting in race conditions. Add support for scheduling sequences of events
even when running in multi-threaded environments.

Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Guozhang Wang

Closes #333 from ewencp/kafka-2667-kafka-based-log-transient-error


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d4a0b88
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d4a0b88
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d4a0b88

Branch: refs/heads/trunk
Commit: 1d4a0b8811d82a6465015a71194a712679d63efe
Parents: 0785fee
Author: Ewen Cheslack-Postava <[email protected]>
Authored: Wed Oct 21 11:20:29 2015 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Wed Oct 21 11:20:29 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/MockConsumer.java    |  58 ++++---
 .../kafka/copycat/util/KafkaBasedLogTest.java   | 164 ++++++++-----------
 2 files changed, 97 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1d4a0b88/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 3c0f261..0242d7b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -19,27 +19,25 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TimeoutException;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.regex.Pattern;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A mock of the {@link Consumer} interface you can use for testing code that 
uses Kafka. This class is <i> not
- * threadsafe </i>. However, you can use the {@link 
#waitForPollThen(Runnable,long)} method to write multithreaded tests
- * where a driver thread waits for {@link #poll(long)} to be called and then 
can safely perform operations during a
- * callback.
+ * threadsafe </i>. However, you can use the {@link 
#schedulePollTask(Runnable)} method to write multithreaded tests
+ * where a driver thread waits for {@link #poll(long)} to be called by a 
background thread and then can safely perform
+ * operations during a callback.
  */
 public class MockConsumer<K, V> implements Consumer<K, V> {
 
@@ -51,7 +49,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     private final Map<TopicPartition, Long> beginningOffsets;
     private final Map<TopicPartition, Long> endOffsets;
 
-    private AtomicReference<CountDownLatch> pollLatch;
+    private Queue<Runnable> pollTasks;
     private KafkaException exception;
 
     private AtomicBoolean wakeup;
@@ -64,7 +62,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         this.closed = false;
         this.beginningOffsets = new HashMap<>();
         this.endOffsets = new HashMap<>();
-        this.pollLatch = new AtomicReference<>();
+        this.pollTasks = new LinkedList<>();
         this.exception = null;
         this.wakeup = new AtomicBoolean(false);
     }
@@ -127,13 +125,12 @@ public class MockConsumer<K, V> implements Consumer<K, V> 
{
     public ConsumerRecords<K, V> poll(long timeout) {
         ensureNotClosed();
 
-        CountDownLatch pollLatchCopy = pollLatch.get();
-        if (pollLatchCopy != null) {
-            pollLatch.set(null);
-            pollLatchCopy.countDown();
-            synchronized (pollLatchCopy) {
-                // Will block until caller of waitUntilPollThen() finishes 
their callback.
-            }
+        // Synchronize around the entire execution so new tasks to be 
triggered on subsequent poll calls can be added in
+        // the callback
+        synchronized (pollTasks) {
+            Runnable task = pollTasks.poll();
+            if (task != null)
+                task.run();
         }
 
         if (wakeup.get()) {
@@ -319,23 +316,24 @@ public class MockConsumer<K, V> implements Consumer<K, V> 
{
         wakeup.set(true);
     }
 
-    public void waitForPoll(long timeoutMs) {
-        waitForPollThen(null, timeoutMs);
+    /**
+     * Schedule a task to be executed during a poll(). One enqueued task will 
be executed per {@link #poll(long)}
+     * invocation. You can use this repeatedly to mock out multiple responses 
to poll invocations.
+     * @param task the task to be executed
+     */
+    public void schedulePollTask(Runnable task) {
+        synchronized (pollTasks) {
+            pollTasks.add(task);
+        }
     }
 
-    public void waitForPollThen(Runnable task, long timeoutMs) {
-        CountDownLatch latch = new CountDownLatch(1);
-        synchronized (latch) {
-            pollLatch.set(latch);
-            try {
-                if (!latch.await(timeoutMs, TimeUnit.MILLISECONDS))
-                    throw new TimeoutException("Timed out waiting for consumer 
thread to call poll().");
-            } catch (InterruptedException e) {
-                throw new IllegalStateException("MockConsumer waiting thread 
was interrupted.", e);
+    public void scheduleNopPollTask() {
+        schedulePollTask(new Runnable() {
+            @Override
+            public void run() {
+                // noop
             }
-            if (task != null)
-                task.run();
-        }
+        });
     }
 
     public Set<TopicPartition> paused() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d4a0b88/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
index 96c4bcc..1ff5e73 100644
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
+++ 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
@@ -165,32 +165,39 @@ public class KafkaBasedLogTest {
         endOffsets.put(TP0, 1L);
         endOffsets.put(TP1, 1L);
         consumer.updateEndOffsets(endOffsets);
-        Thread startConsumerOpsThread = new 
Thread("start-consumer-ops-thread") {
+        final CountDownLatch finishedLatch = new CountDownLatch(1);
+        consumer.schedulePollTask(new Runnable() { // Use first poll task to 
setup sequence of remaining responses to polls
             @Override
             public void run() {
-                // Needs to seek to end to find end offsets
-                consumer.waitForPoll(10000);
-
-                // Should keep polling until it reaches current log end offset 
for all partitions
-                consumer.waitForPollThen(new Runnable() {
+                // Should keep polling until it reaches current log end offset 
for all partitions. Should handle
+                // as many empty polls as needed
+                consumer.scheduleNopPollTask();
+                consumer.scheduleNopPollTask();
+                consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
                         consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 
TP0_KEY, TP0_VALUE));
                     }
-                }, 10000);
-
-                consumer.waitForPollThen(new Runnable() {
+                });
+                consumer.scheduleNopPollTask();
+                consumer.scheduleNopPollTask();
+                consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
                         consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 
TP1_KEY, TP1_VALUE));
                     }
-                }, 10000);
+                });
+                consumer.schedulePollTask(new Runnable() {
+                    @Override
+                    public void run() {
+                        finishedLatch.countDown();
+                    }
+                });
             }
-        };
-        startConsumerOpsThread.start();
+        });
         store.start();
-        startConsumerOpsThread.join(10000);
-        assertFalse(startConsumerOpsThread.isAlive());
+        assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS));
+
         assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
         assertEquals(2, consumedRecords.size());
         assertEquals(TP0_VALUE, consumedRecords.get(0).value());
@@ -227,24 +234,10 @@ public class KafkaBasedLogTest {
         endOffsets.put(TP0, 0L);
         endOffsets.put(TP1, 0L);
         consumer.updateEndOffsets(endOffsets);
-        Thread startConsumerOpsThread = new 
Thread("start-consumer-ops-thread") {
-            @Override
-            public void run() {
-                // Should keep polling until it has partition info
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.seek(TP0, 0);
-                        consumer.seek(TP1, 0);
-                    }
-                }, 10000);
-            }
-        };
-        startConsumerOpsThread.start();
         store.start();
-        startConsumerOpsThread.join(10000);
-        assertFalse(startConsumerOpsThread.isAlive());
         assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+        assertEquals(0L, consumer.position(TP0));
+        assertEquals(0L, consumer.position(TP1));
 
         // Set some keys
         final AtomicInteger invoked = new AtomicInteger(0);
@@ -265,72 +258,58 @@ public class KafkaBasedLogTest {
         assertEquals(2, invoked.get());
 
         // Now we should have to wait for the records to be read back when we 
call readToEnd()
-        final CountDownLatch startOffsetUpdateLatch = new CountDownLatch(1);
-        Thread readNewDataThread = new Thread("read-new-data-thread") {
+        final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false);
+        final FutureCallback<Void> readEndFutureCallback = new 
FutureCallback<>(new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                assertEquals(4, consumedRecords.size());
+                assertEquals(TP0_VALUE_NEW, consumedRecords.get(2).value());
+                assertEquals(TP1_VALUE_NEW, consumedRecords.get(3).value());
+                getInvokedAndPassed.set(true);
+            }
+        });
+        consumer.schedulePollTask(new Runnable() {
             @Override
             public void run() {
-                // Needs to be woken up after calling readToEnd()
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            startOffsetUpdateLatch.await();
-                        } catch (InterruptedException e) {
-                            throw new RuntimeException("Interrupted");
-                        }
-                    }
-                }, 10000);
+                // Once we're synchronized in a poll, start the read to end 
and schedule the exact set of poll events
+                // that should follow. This readToEnd call will immediately 
wakeup this consumer.poll() call without
+                // returning any data.
+                store.readToEnd(readEndFutureCallback);
 
                 // Needs to seek to end to find end offsets
-                consumer.waitForPollThen(new Runnable() {
+                consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
-                        try {
-                            startOffsetUpdateLatch.await();
-                        } catch (InterruptedException e) {
-                            throw new RuntimeException("Interrupted");
-                        }
-
                         Map<TopicPartition, Long> newEndOffsets = new 
HashMap<>();
                         newEndOffsets.put(TP0, 2L);
                         newEndOffsets.put(TP1, 2L);
                         consumer.updateEndOffsets(newEndOffsets);
                     }
-                }, 10000);
+                });
 
                 // Should keep polling until it reaches current log end offset 
for all partitions
-                consumer.waitForPollThen(new Runnable() {
+                consumer.scheduleNopPollTask();
+                consumer.scheduleNopPollTask();
+                consumer.scheduleNopPollTask();
+                consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
                         consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 
TP0_KEY, TP0_VALUE));
                         consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, 
TP0_KEY, TP0_VALUE_NEW));
                         consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 
TP1_KEY, TP1_VALUE));
                     }
-                }, 10000);
+                });
 
-                consumer.waitForPollThen(new Runnable() {
+                consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
                         consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, 
TP1_KEY, TP1_VALUE_NEW));
                     }
-                }, 10000);
-            }
-        };
-        readNewDataThread.start();
-        final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false);
-        FutureCallback<Void> readEndFutureCallback = new FutureCallback<>(new 
Callback<Void>() {
-            @Override
-            public void onCompletion(Throwable error, Void result) {
-                assertEquals(4, consumedRecords.size());
-                assertEquals(TP0_VALUE_NEW, consumedRecords.get(2).value());
-                assertEquals(TP1_VALUE_NEW, consumedRecords.get(3).value());
-                getInvokedAndPassed.set(true);
+                });
+
+                // Already have FutureCallback that should be invoked/awaited, 
so no need for follow up finishedLatch
             }
         });
-        store.readToEnd(readEndFutureCallback);
-        startOffsetUpdateLatch.countDown();
-        readNewDataThread.join(10000);
-        assertFalse(readNewDataThread.isAlive());
         readEndFutureCallback.get(10000, TimeUnit.MILLISECONDS);
         assertTrue(getInvokedAndPassed.get());
 
@@ -349,36 +328,45 @@ public class KafkaBasedLogTest {
 
         PowerMock.replayAll();
 
+        final CountDownLatch finishedLatch = new CountDownLatch(1);
         Map<TopicPartition, Long> endOffsets = new HashMap<>();
         endOffsets.put(TP0, 1L);
         endOffsets.put(TP1, 1L);
         consumer.updateEndOffsets(endOffsets);
-        Thread startConsumerOpsThread = new 
Thread("start-consumer-ops-thread") {
+        consumer.schedulePollTask(new Runnable() {
             @Override
             public void run() {
                 // Trigger exception
-                consumer.waitForPollThen(new Runnable() {
+                consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
                         
consumer.setException(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception());
                     }
-                }, 10000);
+                });
 
                 // Should keep polling until it reaches current log end offset 
for all partitions
-                consumer.waitForPollThen(new Runnable() {
+                consumer.scheduleNopPollTask();
+                consumer.scheduleNopPollTask();
+                consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
                         consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 
TP0_KEY, TP0_VALUE_NEW));
                         consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 
TP0_KEY, TP0_VALUE_NEW));
                     }
-                }, 10000);
+                });
+
+                consumer.schedulePollTask(new Runnable() {
+                    @Override
+                    public void run() {
+                        finishedLatch.countDown();
+                    }
+                });
             }
-        };
-        startConsumerOpsThread.start();
+        });
         store.start();
-        startConsumerOpsThread.join(10000);
-        assertFalse(startConsumerOpsThread.isAlive());
+        assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS));
         assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+        assertEquals(1L, consumer.position(TP0));
 
         store.stop();
 
@@ -403,24 +391,10 @@ public class KafkaBasedLogTest {
         endOffsets.put(TP0, 0L);
         endOffsets.put(TP1, 0L);
         consumer.updateEndOffsets(endOffsets);
-        Thread startConsumerOpsThread = new 
Thread("start-consumer-ops-thread") {
-            @Override
-            public void run() {
-                // Should keep polling until it has partition info
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.seek(TP0, 0);
-                        consumer.seek(TP1, 0);
-                    }
-                }, 10000);
-            }
-        };
-        startConsumerOpsThread.start();
         store.start();
-        startConsumerOpsThread.join(10000);
-        assertFalse(startConsumerOpsThread.isAlive());
         assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+        assertEquals(0L, consumer.position(TP0));
+        assertEquals(0L, consumer.position(TP1));
 
         final AtomicReference<Throwable> setException = new 
AtomicReference<>();
         store.send(TP0_KEY, TP0_VALUE, new 
org.apache.kafka.clients.producer.Callback() {

Reply via email to