Repository: kafka
Updated Branches:
  refs/heads/trunk d6e900a18 -> 0f00ec97a


KAFKA-2859: Fix deadlock in WorkerSourceTask.

Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Gwen Shapira

Closes #554 from ewencp/kafka-2859-deadlock-worker-source-task


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0f00ec97
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0f00ec97
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0f00ec97

Branch: refs/heads/trunk
Commit: 0f00ec97ae328d04c29cb1cb3eabad3f17e31582
Parents: d6e900a
Author: Ewen Cheslack-Postava <[email protected]>
Authored: Wed Nov 18 14:19:37 2015 -0800
Committer: Gwen Shapira <[email protected]>
Committed: Wed Nov 18 14:19:37 2015 -0800

----------------------------------------------------------------------
 .../kafka/connect/runtime/WorkerSourceTask.java | 112 +++++++++++------
 .../connect/runtime/WorkerSourceTaskTest.java   | 124 +++++++++++++++----
 2 files changed, 176 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0f00ec97/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 5d0b7e7..7178542 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -17,11 +17,14 @@
 
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.Converter;
@@ -35,6 +38,7 @@ import org.slf4j.LoggerFactory;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -46,6 +50,8 @@ import java.util.concurrent.TimeoutException;
 class WorkerSourceTask implements WorkerTask {
     private static final Logger log = 
LoggerFactory.getLogger(WorkerSourceTask.class);
 
+    private static final long SEND_FAILED_BACKOFF_MS = 100;
+
     private final ConnectorTaskId id;
     private final SourceTask task;
     private final Converter keyConverter;
@@ -57,12 +63,15 @@ class WorkerSourceTask implements WorkerTask {
     private final WorkerConfig workerConfig;
     private final Time time;
 
+    private List<SourceRecord> toSend;
+    private boolean lastSendFailed; // Whether the last send failed 
*synchronously*, i.e. never made it into the producer's RecordAccumulator
     // Use IdentityHashMap to ensure correctness with duplicate records. This 
is a HashMap because
     // there is no IdentityHashSet.
     private IdentityHashMap<ProducerRecord<byte[], byte[]>, 
ProducerRecord<byte[], byte[]>> outstandingMessages;
     // A second buffer is used while an offset flush is running
     private IdentityHashMap<ProducerRecord<byte[], byte[]>, 
ProducerRecord<byte[], byte[]>> outstandingMessagesBacklog;
     private boolean flushing;
+    private CountDownLatch stopRequestedLatch;
 
     public WorkerSourceTask(ConnectorTaskId id, SourceTask task,
                             Converter keyConverter, Converter valueConverter,
@@ -79,9 +88,12 @@ class WorkerSourceTask implements WorkerTask {
         this.workerConfig = workerConfig;
         this.time = time;
 
+        this.toSend = null;
+        this.lastSendFailed = false;
         this.outstandingMessages = new IdentityHashMap<>();
         this.outstandingMessagesBacklog = new IdentityHashMap<>();
         this.flushing = false;
+        this.stopRequestedLatch = new CountDownLatch(1);
     }
 
     @Override
@@ -92,8 +104,10 @@ class WorkerSourceTask implements WorkerTask {
 
     @Override
     public void stop() {
-        if (workThread != null)
+        if (workThread != null) {
             workThread.startGracefulShutdown();
+            stopRequestedLatch.countDown();
+        }
     }
 
     @Override
@@ -117,47 +131,69 @@ class WorkerSourceTask implements WorkerTask {
     }
 
     /**
-     * Send a batch of records. This is atomic up to the point of getting the 
messages into the
-     * Producer and recorded in our set of outstanding messages, so either all 
or none will be sent
-     * @param records
+     * Try to send a batch of records. If a send fails and is retriable, this 
saves the remainder of the batch so it can
+     * be retried after backing off. If a send fails and is not retriable, 
this will throw a ConnectException.
+     * @return true if all messages were sent, false if some need to be retried
      */
-    private synchronized void sendRecords(List<SourceRecord> records) {
-        for (final SourceRecord record : records) {
+    private boolean sendRecords() {
+        int processed = 0;
+        for (final SourceRecord record : toSend) {
             byte[] key = keyConverter.fromConnectData(record.topic(), 
record.keySchema(), record.key());
             byte[] value = valueConverter.fromConnectData(record.topic(), 
record.valueSchema(), record.value());
             final ProducerRecord<byte[], byte[]> producerRecord = new 
ProducerRecord<>(record.topic(), record.kafkaPartition(), key, value);
             log.trace("Appending record with key {}, value {}", record.key(), 
record.value());
-            if (!flushing) {
-                outstandingMessages.put(producerRecord, producerRecord);
-            } else {
-                outstandingMessagesBacklog.put(producerRecord, producerRecord);
+            // We need this queued first since the callback could happen 
immediately (even synchronously in some cases).
+            // Because of this we need to be careful about handling retries -- 
we always save the previously attempted
+            // record as part of toSend and need to use a flag to track 
whether we should actually add it to the outstanding
+            // messages and update the offsets.
+            synchronized (this) {
+                if (!lastSendFailed) {
+                    if (!flushing) {
+                        outstandingMessages.put(producerRecord, 
producerRecord);
+                    } else {
+                        outstandingMessagesBacklog.put(producerRecord, 
producerRecord);
+                    }
+                    // Offsets are converted & serialized in the OffsetWriter
+                    offsetWriter.offset(record.sourcePartition(), 
record.sourceOffset());
+                }
             }
-            producer.send(
-                    producerRecord,
-                    new Callback() {
-                        @Override
-                        public void onCompletion(RecordMetadata 
recordMetadata, Exception e) {
-                            if (e != null) {
-                                // Given the default settings for zero data 
loss, this should basically never happen --
-                                // between "infinite" retries, indefinite 
blocking on full buffers, and "infinite" request
-                                // timeouts, callbacks with exceptions should 
never be invoked in practice. If the
-                                // user overrode these settings, the best we 
can do is notify them of the failure via
-                                // logging.
-                                log.error("{} failed to send record to {}: 
{}", id, record.topic(), e);
-                                log.debug("Failed record: topic {}, Kafka 
partition {}, key {}, value {}, source offset {}, source partition {}",
-                                        record.topic(), 
record.kafkaPartition(), record.key(), record.value(),
-                                        record.sourceOffset(), 
record.sourcePartition());
-                            } else {
-                                log.trace("Wrote record successfully: topic {} 
partition {} offset {}",
-                                        recordMetadata.topic(), 
recordMetadata.partition(),
-                                        recordMetadata.offset());
+            try {
+                producer.send(
+                        producerRecord,
+                        new Callback() {
+                            @Override
+                            public void onCompletion(RecordMetadata 
recordMetadata, Exception e) {
+                                if (e != null) {
+                                    // Given the default settings for zero 
data loss, this should basically never happen --
+                                    // between "infinite" retries, indefinite 
blocking on full buffers, and "infinite" request
+                                    // timeouts, callbacks with exceptions 
should never be invoked in practice. If the
+                                    // user overrode these settings, the best 
we can do is notify them of the failure via
+                                    // logging.
+                                    log.error("{} failed to send record to {}: 
{}", id, record.topic(), e);
+                                    log.debug("Failed record: topic {}, Kafka 
partition {}, key {}, value {}, source offset {}, source partition {}",
+                                            record.topic(), 
record.kafkaPartition(), record.key(), record.value(),
+                                            record.sourceOffset(), 
record.sourcePartition());
+                                } else {
+                                    log.trace("Wrote record successfully: 
topic {} partition {} offset {}",
+                                            recordMetadata.topic(), 
recordMetadata.partition(),
+                                            recordMetadata.offset());
+                                }
+                                recordSent(producerRecord);
                             }
-                            recordSent(producerRecord);
-                        }
-                    });
-            // Offsets are converted & serialized in the OffsetWriter
-            offsetWriter.offset(record.sourcePartition(), 
record.sourceOffset());
+                        });
+                lastSendFailed = false;
+            } catch (RetriableException e) {
+                log.warn("Failed to send {}, backing off before retrying:", 
producerRecord, e);
+                toSend = toSend.subList(processed, toSend.size());
+                lastSendFailed = true;
+                return false;
+            } catch (KafkaException e) {
+                throw new ConnectException("Unrecoverable exception trying to 
send", e);
+            }
+            processed++;
         }
+        toSend = null;
+        return true;
     }
 
     private synchronized void recordSent(final ProducerRecord<byte[], byte[]> 
record) {
@@ -309,10 +345,12 @@ class WorkerSourceTask implements WorkerTask {
                 }
 
                 while (getRunning()) {
-                    List<SourceRecord> records = task.poll();
-                    if (records == null)
+                    if (toSend == null)
+                        toSend = task.poll();
+                    if (toSend == null)
                         continue;
-                    sendRecords(records);
+                    if (!sendRecords())
+                        stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, 
TimeUnit.MILLISECONDS);
                 }
             } catch (InterruptedException e) {
                 // Ignore and allow to exit.

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f00ec97/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 9a382b6..7380f1c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -198,11 +198,12 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         // Can just use the same record for key and value
         records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
 
-        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord();
+        Capture<ProducerRecord<byte[], byte[]>> sent = 
expectSendRecordAnyTimes();
 
         PowerMock.replayAll();
 
-        Whitebox.invokeMethod(workerTask, "sendRecords", records);
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
         assertEquals(SERIALIZED_KEY, sent.getValue().key());
         assertEquals(SERIALIZED_RECORD, sent.getValue().value());
 
@@ -210,6 +211,40 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     }
 
     @Test
+    public void testSendRecordsRetries() throws Exception {
+        createWorkerTask();
+
+        // Differentiate only by Kafka partition so we can reuse conversion 
expectations
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, "topic", 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, "topic", 3, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        // First round
+        expectSendRecordOnce(false);
+        // Any Producer retriable exception should work here
+        expectSendRecordSyncFailure(new 
org.apache.kafka.common.errors.TimeoutException("retriable sync failure"));
+
+        // Second round
+        expectSendRecordOnce(true);
+        expectSendRecordOnce(false);
+
+        PowerMock.replayAll();
+
+        // Try to send 3, make first pass, second fail. Should save last two
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2, record3));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertEquals(Arrays.asList(record2, record3), 
Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testSlowTaskStart() throws Exception {
         createWorkerTask();
 
@@ -252,38 +287,81 @@ public class WorkerSourceTaskTest extends ThreadedTest {
                     }
                 });
         // Fallout of the poll() call
-        expectSendRecord();
+        expectSendRecordAnyTimes();
         return latch;
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() throws 
InterruptedException {
-        EasyMock.expect(keyConverter.fromConnectData(TOPIC, KEY_SCHEMA, 
KEY)).andStubReturn(SERIALIZED_KEY);
-        EasyMock.expect(valueConverter.fromConnectData(TOPIC, RECORD_SCHEMA, 
RECORD)).andStubReturn(SERIALIZED_RECORD);
+    private void expectSendRecordSyncFailure(Throwable error) throws 
InterruptedException {
+        expectConvertKeyValue(false);
+
+        offsetWriter.offset(PARTITION, OFFSET);
+        PowerMock.expectLastCall();
 
-        Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
-        // 1. Converted data passed to the producer, which will need callbacks 
invoked for flush to work
         EasyMock.expect(
+                producer.send(EasyMock.anyObject(ProducerRecord.class),
+                        
EasyMock.anyObject(org.apache.kafka.clients.producer.Callback.class)))
+                .andThrow(error);
+    }
+
+    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() 
throws InterruptedException {
+        return expectSendRecord(true, false);
+    }
+
+    private Capture<ProducerRecord<byte[], byte[]>> 
expectSendRecordOnce(boolean isRetry) throws InterruptedException {
+        return expectSendRecord(false, isRetry);
+    }
+
+    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(boolean 
anyTimes, boolean isRetry) throws InterruptedException {
+        expectConvertKeyValue(anyTimes);
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
+
+        // 1. Offset data is passed to the offset storage.
+        if (!isRetry) {
+            offsetWriter.offset(PARTITION, OFFSET);
+            if (anyTimes)
+                PowerMock.expectLastCall().anyTimes();
+            else
+                PowerMock.expectLastCall();
+        }
+
+        // 2. Converted data passed to the producer, which will need callbacks 
invoked for flush to work
+        IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
                 producer.send(EasyMock.capture(sent),
-                        EasyMock.capture(producerCallbacks)))
-                .andStubAnswer(new IAnswer<Future<RecordMetadata>>() {
-                    @Override
-                    public Future<RecordMetadata> answer() throws Throwable {
-                        synchronized (producerCallbacks) {
-                            for (org.apache.kafka.clients.producer.Callback cb 
: producerCallbacks.getValues()) {
-                                cb.onCompletion(new RecordMetadata(new 
TopicPartition("foo", 0), 0, 0), null);
-                            }
-                            producerCallbacks.reset();
-                        }
-                        return sendFuture;
+                        EasyMock.capture(producerCallbacks)));
+        IAnswer<Future<RecordMetadata>> expectResponse = new 
IAnswer<Future<RecordMetadata>>() {
+            @Override
+            public Future<RecordMetadata> answer() throws Throwable {
+                synchronized (producerCallbacks) {
+                    for (org.apache.kafka.clients.producer.Callback cb : 
producerCallbacks.getValues()) {
+                        cb.onCompletion(new RecordMetadata(new 
TopicPartition("foo", 0), 0, 0), null);
                     }
-                });
-        // 2. Offset data is passed to the offset storage.
-        offsetWriter.offset(PARTITION, OFFSET);
-        PowerMock.expectLastCall().anyTimes();
+                    producerCallbacks.reset();
+                }
+                return sendFuture;
+            }
+        };
+        if (anyTimes)
+            expect.andStubAnswer(expectResponse);
+        else
+            expect.andAnswer(expectResponse);
 
         return sent;
     }
 
+    private void expectConvertKeyValue(boolean anyTimes) {
+        IExpectationSetters<byte[]> convertKeyExpect = 
EasyMock.expect(keyConverter.fromConnectData(TOPIC, KEY_SCHEMA, KEY));
+        if (anyTimes)
+            convertKeyExpect.andStubReturn(SERIALIZED_KEY);
+        else
+            convertKeyExpect.andReturn(SERIALIZED_KEY);
+        IExpectationSetters<byte[]> convertValueExpect = 
EasyMock.expect(valueConverter.fromConnectData(TOPIC, RECORD_SCHEMA, RECORD));
+        if (anyTimes)
+            convertValueExpect.andStubReturn(SERIALIZED_RECORD);
+        else
+            convertValueExpect.andReturn(SERIALIZED_RECORD);
+    }
+
     private void awaitPolls(CountDownLatch latch) throws InterruptedException {
         latch.await(1000, TimeUnit.MILLISECONDS);
     }

Reply via email to