Repository: kafka Updated Branches: refs/heads/trunk 7cc6e0e2d -> 9ff54cb5d
MINOR: Catch Throwable in commitSourceTask() Author: Liquan Pei <[email protected]> Reviewers: Jason Gustafson <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #1402 from Ishiihara/source-task-commit-record Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9ff54cb5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9ff54cb5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9ff54cb5 Branch: refs/heads/trunk Commit: 9ff54cb5dd99fab83533ddfdf0ed89508e5525fb Parents: 7cc6e0e Author: Liquan Pei <[email protected]> Authored: Tue Jun 14 13:21:30 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Tue Jun 14 13:21:30 2016 -0700 ---------------------------------------------------------------------- .../kafka/connect/runtime/WorkerSourceTask.java | 6 +- .../connect/runtime/WorkerSourceTaskTest.java | 79 ++++++++++++++++---- 2 files changed, 67 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff54cb5/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index fd551ab..83d1c84 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -243,6 +243,8 @@ class WorkerSourceTask extends WorkerTask { task.commitRecord(record); } catch (InterruptedException e) { log.error("Exception thrown", e); + } catch (Throwable t) { + log.error("Exception thrown while calling task.commitRecord()", t); } } @@ -366,8 +368,8 @@ class WorkerSourceTask extends WorkerTask { this.task.commit(); } catch (InterruptedException ex) { log.warn("Commit interrupted", ex); - } catch (Throwable ex) { - log.error("Exception thrown while calling task.commit()", ex); + } catch (Throwable t) { + log.error("Exception thrown while calling task.commit()", t); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff54cb5/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 0768781..0761245 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.data.Schema; @@ -52,6 +53,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -66,6 +68,7 @@ import static org.junit.Assert.assertTrue; @RunWith(PowerMockRunner.class) public class WorkerSourceTaskTest extends ThreadedTest { + private final Random random = new Random(); private static final String TOPIC = "topic"; private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes()); private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12); @@ -197,7 +200,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { workerTask.initialize(TASK_CONFIG); executor.submit(workerTask); - awaitPolls(pollLatch); + awaitLatch(pollLatch); workerTask.transitionTo(TargetState.PAUSED); @@ -238,7 +241,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { workerTask.initialize(TASK_CONFIG); executor.submit(workerTask); - awaitPolls(pollLatch); + awaitLatch(pollLatch); workerTask.stop(); assertTrue(workerTask.awaitStop(1000)); @@ -271,7 +274,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { workerTask.initialize(TASK_CONFIG); executor.submit(workerTask); - awaitPolls(pollLatch); + awaitLatch(pollLatch); workerTask.stop(); assertTrue(workerTask.awaitStop(1000)); @@ -306,7 +309,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { workerTask.initialize(TASK_CONFIG); executor.submit(workerTask); - awaitPolls(pollLatch); + awaitLatch(pollLatch); assertTrue(workerTask.commitOffsets()); workerTask.stop(); assertTrue(workerTask.awaitStop(1000)); @@ -341,7 +344,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { workerTask.initialize(TASK_CONFIG); executor.submit(workerTask); - awaitPolls(pollLatch); + awaitLatch(pollLatch); assertTrue(workerTask.commitOffsets()); workerTask.stop(); assertTrue(workerTask.awaitStop(1000)); @@ -404,6 +407,30 @@ public class WorkerSourceTaskTest extends ThreadedTest { } @Test + public void testSendRecordsTaskCommitRecordFail() throws Exception { + createWorkerTask(); + + // Differentiate only by Kafka partition so we can reuse conversion expectations + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, "topic", 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + // Source task commit record failure will not cause the task to abort + expectSendRecordOnce(false); + expectSendRecordTaskCommitRecordFail(false, false); + expectSendRecordOnce(false); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3)); + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed")); + assertNull(Whitebox.getInternalState(workerTask, "toSend")); + + PowerMock.verifyAll(); + } + + @Test public void testSlowTaskStart() throws Exception { final CountDownLatch startupLatch = new CountDownLatch(1); @@ -435,7 +462,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { // Stopping immediately while the other thread has work to do should result in no polling, no offset commits, // exiting the work thread immediately, and the stop() method will be invoked in the background thread since it // cannot be invoked immediately in the thread trying to stop the task. - startupLatch.await(1000, TimeUnit.MILLISECONDS); + awaitLatch(startupLatch); workerTask.stop(); assertTrue(workerTask.awaitStop(1000)); @@ -479,14 +506,22 @@ public class WorkerSourceTaskTest extends ThreadedTest { } private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() throws InterruptedException { - return expectSendRecord(true, false); + return expectSendRecordTaskCommitRecordSucceed(true, false); } private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordOnce(boolean isRetry) throws InterruptedException { - return expectSendRecord(false, isRetry); + return expectSendRecordTaskCommitRecordSucceed(false, isRetry); + } + + private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordSucceed(boolean anyTimes, boolean isRetry) throws InterruptedException { + return expectSendRecord(anyTimes, isRetry, true); } - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(boolean anyTimes, boolean isRetry) throws InterruptedException { + private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordFail(boolean anyTimes, boolean isRetry) throws InterruptedException { + return expectSendRecord(anyTimes, isRetry, false); + } + + private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(boolean anyTimes, boolean isRetry, boolean succeed) throws InterruptedException { expectConvertKeyValue(anyTimes); Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture(); @@ -523,11 +558,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { expect.andAnswer(expectResponse); // 3. As a result of a successful producer send callback, we'll notify the source task of the record commit - sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class)); - if (anyTimes) - EasyMock.expectLastCall().anyTimes(); - else - EasyMock.expectLastCall(); + expectTaskCommitRecord(anyTimes, succeed); return sent; } @@ -545,8 +576,24 @@ public class WorkerSourceTaskTest extends ThreadedTest { convertValueExpect.andReturn(SERIALIZED_RECORD); } - private boolean awaitPolls(CountDownLatch latch) throws InterruptedException { - return latch.await(1000, TimeUnit.MILLISECONDS); + private void expectTaskCommitRecord(boolean anyTimes, boolean succeed) throws InterruptedException { + sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class)); + IExpectationSetters<Void> expect = EasyMock.expectLastCall(); + if (!succeed) { + expect = expect.andThrow(new InterruptException("Error committing record in source task")); + } + if (anyTimes) { + expect.anyTimes(); + } + } + + private boolean awaitLatch(CountDownLatch latch) { + try { + return latch.await(1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // ignore + } + return false; } @SuppressWarnings("unchecked")
