Repository: kafka Updated Branches: refs/heads/trunk b65f9a777 -> b45a67ede
KAFKA-4161: KIP-89: Allow sink connectors to decouple flush and offset commit Author: Shikhar Bhushan <shik...@confluent.io> Reviewers: Ewen Cheslack-Postava <e...@confluent.io> Closes #2139 from shikhar/kafka-4161-deux Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b45a67ed Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b45a67ed Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b45a67ed Branch: refs/heads/trunk Commit: b45a67ede9021985c8df87c633b225231092c0c9 Parents: b65f9a7 Author: Shikhar Bhushan <shik...@confluent.io> Authored: Thu Dec 1 15:01:09 2016 -0800 Committer: Ewen Cheslack-Postava <m...@ewencp.org> Committed: Thu Dec 1 15:01:09 2016 -0800 ---------------------------------------------------------------------- .../org/apache/kafka/connect/sink/SinkTask.java | 27 ++- .../kafka/connect/sink/SinkTaskContext.java | 9 + .../kafka/connect/runtime/WorkerSinkTask.java | 71 +++++-- .../connect/runtime/WorkerSinkTaskContext.java | 15 ++ .../connect/runtime/WorkerSinkTaskTest.java | 192 ++++++++++++++++++- .../runtime/WorkerSinkTaskThreadedTest.java | 38 ++-- 6 files changed, 301 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b45a67ed/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java index 3d0becc..99a2683 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java @@ -95,13 +95,30 @@ public abstract class SinkTask implements Task { public abstract void put(Collection<SinkRecord> records); /** - * Flush all records that have been {@link #put} for the specified topic-partitions. The - * offsets are provided for convenience, but could also be determined by tracking all offsets - * included in the SinkRecords passed to {@link #put}. + * Flush all records that have been {@link #put(Collection)} for the specified topic-partitions. * - * @param offsets mapping of TopicPartition to committed offset + * @param currentOffsets the current offset state as of the last call to {@link #put(Collection)}}, + * provided for convenience but could also be determined by tracking all offsets included in the {@link SinkRecord}s + * passed to {@link #put}. */ - public abstract void flush(Map<TopicPartition, OffsetAndMetadata> offsets); + public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) { + } + + /** + * Pre-commit hook invoked prior to an offset commit. + * + * The default implementation simply invokes {@link #flush(Map)} and is thus able to assume all {@code currentOffsets} are safe to commit. + * + * @param currentOffsets the current offset state as of the last call to {@link #put(Collection)}}, + * provided for convenience but could also be determined by tracking all offsets included in the {@link SinkRecord}s + * passed to {@link #put}. + * + * @return an empty map if Connect-managed offset commit is not desired, otherwise a map of offsets by topic-partition that are safe to commit. + */ + public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) { + flush(currentOffsets); + return currentOffsets; + } /** * The SinkTask use this method to create writers for newly assigned partitions in case of partition http://git-wip-us.apache.org/repos/asf/kafka/blob/b45a67ed/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java index 2202cae..14f13d1 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java @@ -79,4 +79,13 @@ public interface SinkTaskContext { * @param partitions the partitions to resume */ void resume(TopicPartition... partitions); + + /** + * Request an offset commit. Sink tasks can use this to minimize the potential for redelivery + * by requesting an offset commit as soon as they flush data to the destination system. + * + * It is only a hint to the runtime and no timing guarantee should be assumed. + */ + void requestCommit(); + } http://git-wip-us.apache.org/repos/asf/kafka/blob/b45a67ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 1575581..b941469 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -150,19 +150,21 @@ class WorkerSinkTask extends WorkerTask { } protected void iteration() { + final long offsetCommitIntervalMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); + final long commitTimeoutMs = commitStarted + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); + try { long now = time.milliseconds(); // Maybe commit - if (!committing && now >= nextCommit) { + if (!committing && (context.isCommitRequested() || now >= nextCommit)) { commitOffsets(now, false); - nextCommit += workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); + nextCommit += offsetCommitIntervalMs; + context.clearCommitRequest(); } // Check for timed out commits - long commitTimeout = commitStarted + workerConfig.getLong( - WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); - if (committing && now >= commitTimeout) { + if (committing && now >= commitTimeoutMs) { log.warn("Commit of {} offsets timed out", this); commitFailures++; committing = false; @@ -267,7 +269,9 @@ class WorkerSinkTask extends WorkerTask { OffsetCommitCallback cb = new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception error) { - lastCommittedOffsets = offsets; + if (error == null) { + lastCommittedOffsets = offsets; + } onCommitCompleted(error, seqno); } }; @@ -283,27 +287,58 @@ class WorkerSinkTask extends WorkerTask { commitSeqno += 1; commitStarted = now; - Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(currentOffsets); + final Map<TopicPartition, OffsetAndMetadata> taskProvidedOffsets; try { - task.flush(offsets); + taskProvidedOffsets = task.preCommit(new HashMap<>(currentOffsets)); } catch (Throwable t) { - log.error("Commit of {} offsets failed due to exception while flushing:", this, t); - log.error("Rewinding offsets to last committed offsets"); - for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsets.entrySet()) { - log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset()); - consumer.seek(entry.getKey(), entry.getValue().offset()); + if (closing) { + log.warn("{} Offset commit failed during close"); + onCommitCompleted(t, commitSeqno); + } else { + log.error("{} Offset commit failed, rewinding to last committed offsets", this, t); + for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsets.entrySet()) { + log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset()); + consumer.seek(entry.getKey(), entry.getValue().offset()); + } + currentOffsets = new HashMap<>(lastCommittedOffsets); + onCommitCompleted(t, commitSeqno); } - currentOffsets = new HashMap<>(lastCommittedOffsets); - onCommitCompleted(t, commitSeqno); return; } finally { - // Close the task if needed before committing the offsets. This is basically the last chance for - // the connector to actually flush data that has been written to it. + // Close the task if needed before committing the offsets. if (closing) task.close(currentOffsets.keySet()); } - doCommit(offsets, closing, commitSeqno); + if (taskProvidedOffsets.isEmpty()) { + log.debug("{} Skipping offset commit, task opted-out", this); + onCommitCompleted(null, commitSeqno); + return; + } + + final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new HashMap<>(lastCommittedOffsets); + for (Map.Entry<TopicPartition, OffsetAndMetadata> taskProvidedOffsetEntry : taskProvidedOffsets.entrySet()) { + final TopicPartition partition = taskProvidedOffsetEntry.getKey(); + final OffsetAndMetadata taskProvidedOffset = taskProvidedOffsetEntry.getValue(); + if (commitableOffsets.containsKey(partition)) { + if (taskProvidedOffset.offset() <= currentOffsets.get(partition).offset()) { + commitableOffsets.put(partition, taskProvidedOffset); + } else { + log.warn("Ignoring invalid task provided offset {}/{} -- not yet consumed", partition, taskProvidedOffset); + } + } else { + log.warn("Ignoring invalid task provided offset {}/{} -- partition not assigned", partition, taskProvidedOffset); + } + } + + if (commitableOffsets.equals(lastCommittedOffsets)) { + log.debug("{} Skipping offset commit, no change since last commit", this); + onCommitCompleted(null, commitSeqno); + return; + } + + log.trace("{} Offsets to commit: {}", this, commitableOffsets); + doCommit(commitableOffsets, closing, commitSeqno); } http://git-wip-us.apache.org/repos/asf/kafka/blob/b45a67ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java index c762bdd..ede76c4 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java @@ -27,6 +27,7 @@ public class WorkerSinkTaskContext implements SinkTaskContext { private long timeoutMs; private KafkaConsumer<byte[], byte[]> consumer; private final Set<TopicPartition> pausedPartitions; + private boolean commitRequested; public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) { this.offsets = new HashMap<>(); @@ -109,4 +110,18 @@ public class WorkerSinkTaskContext implements SinkTaskContext { public Set<TopicPartition> pausedPartitions() { return pausedPartitions; } + + @Override + public void requestCommit() { + commitRequested = true; + } + + public boolean isCommitRequested() { + return commitRequested; + } + + public void clearCommitRequest() { + commitRequested = false; + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/b45a67ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index ca218c3..1f9e56b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.record.Record; @@ -49,6 +50,7 @@ import org.powermock.api.easymock.annotation.Mock; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; import java.util.ArrayList; import java.util.Collection; @@ -62,6 +64,9 @@ import java.util.Set; import static java.util.Arrays.asList; import static java.util.Collections.singleton; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @RunWith(PowerMockRunner.class) @@ -297,15 +302,21 @@ public class WorkerSinkTaskTest { @Test public void testWakeupInCommitSyncCausesRetry() throws Exception { expectInitializeTask(); - expectPollInitialAssignment(); - final List<TopicPartition> partitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2); + expectPollInitialAssignment(); - sinkTask.close(new HashSet<>(partitions)); + expectConsumerPoll(1); + expectConvertMessages(1); + sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject()); EasyMock.expectLastCall(); - sinkTask.flush(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject()); - EasyMock.expectLastCall(); + final List<TopicPartition> partitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2); + + final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); + offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); + offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); + sinkTask.preCommit(offsets); + EasyMock.expectLastCall().andReturn(offsets); // first one raises wakeup consumer.commitSync(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject()); @@ -315,6 +326,9 @@ public class WorkerSinkTaskTest { consumer.commitSync(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject()); EasyMock.expectLastCall(); + sinkTask.close(new HashSet<>(partitions)); + EasyMock.expectLastCall(); + EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); @@ -344,12 +358,173 @@ public class WorkerSinkTaskTest { workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); workerTask.iteration(); // poll for initial assignment + workerTask.iteration(); // first record delivered workerTask.iteration(); // now rebalance with the wakeup triggered PowerMock.verifyAll(); } @Test + public void testRequestCommit() throws Exception { + expectInitializeTask(); + + expectPollInitialAssignment(); + + expectConsumerPoll(1); + expectConvertMessages(1); + sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject()); + EasyMock.expectLastCall(); + + final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); + offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); + offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); + sinkTask.preCommit(offsets); + EasyMock.expectLastCall().andReturn(offsets); + + final Capture<OffsetCommitCallback> callback = EasyMock.newCapture(); + consumer.commitAsync(EasyMock.eq(offsets), EasyMock.capture(callback)); + EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() { + @Override + public Void answer() throws Throwable { + callback.getValue().onComplete(offsets, null); + return null; + } + }); + + expectConsumerPoll(0); + sinkTask.put(Collections.<SinkRecord>emptyList()); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + + workerTask.iteration(); // initial assignment + + workerTask.iteration(); // first record delivered + + sinkTaskContext.getValue().requestCommit(); + assertTrue(sinkTaskContext.getValue().isCommitRequested()); + assertNotEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets")); + workerTask.iteration(); // triggers the commit + assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been cleared + assertEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets")); + assertEquals(0, workerTask.commitFailures()); + + PowerMock.verifyAll(); + } + + @Test + public void testPreCommit() throws Exception { + expectInitializeTask(); + + // iter 1 + expectPollInitialAssignment(); + + // iter 2 + expectConsumerPoll(2); + expectConvertMessages(2); + sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject()); + EasyMock.expectLastCall(); + + final Map<TopicPartition, OffsetAndMetadata> workerStartingOffsets = new HashMap<>(); + workerStartingOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET)); + workerStartingOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); + + final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>(); + workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2)); + workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); + + final Map<TopicPartition, OffsetAndMetadata> taskOffsets = new HashMap<>(); + taskOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); // act like FIRST_OFFSET+2 has not yet been flushed by the task + taskOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET + 1)); // should be ignored because > current offset + taskOffsets.put(new TopicPartition(TOPIC, 3), new OffsetAndMetadata(FIRST_OFFSET)); // should be ignored because this partition is not assigned + + final Map<TopicPartition, OffsetAndMetadata> committableOffsets = new HashMap<>(); + committableOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); + committableOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); + + sinkTask.preCommit(workerCurrentOffsets); + EasyMock.expectLastCall().andReturn(taskOffsets); + final Capture<OffsetCommitCallback> callback = EasyMock.newCapture(); + consumer.commitAsync(EasyMock.eq(committableOffsets), EasyMock.capture(callback)); + EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() { + @Override + public Void answer() throws Throwable { + callback.getValue().onComplete(committableOffsets, null); + return null; + } + }); + expectConsumerPoll(0); + sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject()); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + workerTask.iteration(); // iter 1 -- initial assignment + + assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets")); + workerTask.iteration(); // iter 2 -- deliver 2 records + + assertEquals(workerCurrentOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets")); + assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets")); + sinkTaskContext.getValue().requestCommit(); + workerTask.iteration(); // iter 3 -- commit + assertEquals(committableOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets")); + + PowerMock.verifyAll(); + } + + @Test + public void testIgnoredCommit() throws Exception { + expectInitializeTask(); + + // iter 1 + expectPollInitialAssignment(); + + // iter 2 + expectConsumerPoll(1); + expectConvertMessages(1); + sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject()); + EasyMock.expectLastCall(); + + final Map<TopicPartition, OffsetAndMetadata> workerStartingOffsets = new HashMap<>(); + workerStartingOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET)); + workerStartingOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); + + final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>(); + workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); + workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); + + // iter 3 + sinkTask.preCommit(workerCurrentOffsets); + EasyMock.expectLastCall().andReturn(workerStartingOffsets); + // no actual consumer.commit() triggered + expectConsumerPoll(0); + sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject()); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + workerTask.iteration(); // iter 1 -- initial assignment + + assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets")); + assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets")); + + workerTask.iteration(); // iter 2 -- deliver 2 records + + sinkTaskContext.getValue().requestCommit(); + workerTask.iteration(); // iter 3 -- commit + + PowerMock.verifyAll(); + } + + @Test public void testMissingTimestampPropagation() throws Exception { expectInitializeTask(); expectConsumerPoll(1, Record.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE); @@ -434,11 +609,8 @@ public class WorkerSinkTaskTest { sinkTask.close(new HashSet<>(partitions)); EasyMock.expectLastCall(); - sinkTask.flush(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject()); - EasyMock.expectLastCall(); - - consumer.commitSync(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject()); - EasyMock.expectLastCall(); + sinkTask.preCommit(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject()); + EasyMock.expectLastCall().andReturn(Collections.emptyMap()); EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); http://git-wip-us.apache.org/repos/asf/kafka/blob/b45a67ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index 52a86ab..8fa62b6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -181,7 +181,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { // Make each poll() take the offset commit interval Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); - expectOffsetFlush(1L, null, null, 0, true); + expectOffsetCommit(1L, null, null, 0, true); expectStopTask(); PowerMock.replayAll(); @@ -207,12 +207,12 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { } @Test - public void testCommitTaskFlushFailure() throws Exception { + public void testCommitFailure() throws Exception { expectInitializeTask(); expectPollInitialAssignment(); Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); - expectOffsetFlush(1L, new RuntimeException(), null, 0, true); + expectOffsetCommit(1L, new RuntimeException(), null, 0, true); // Should rewind to last known good positions, which in this case will be the offsets loaded during initialization // for all topic partitions consumer.seek(TOPIC_PARTITION, FIRST_OFFSET); @@ -244,14 +244,14 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { } @Test - public void testCommitTaskSuccessAndFlushFailure() throws Exception { - // Validate that we rewind to the correct offsets if a task's flush method throws an exception + public void testCommitSuccessFollowedByFailure() throws Exception { + // Validate that we rewind to the correct offsets if a task's preCommit() method throws an exception expectInitializeTask(); expectPollInitialAssignment(); Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); - expectOffsetFlush(1L, null, null, 0, true); - expectOffsetFlush(2L, new RuntimeException(), null, 0, true); + expectOffsetCommit(1L, null, null, 0, true); + expectOffsetCommit(2L, new RuntimeException(), null, 0, true); // Should rewind to last known committed positions consumer.seek(TOPIC_PARTITION, FIRST_OFFSET + 1); PowerMock.expectLastCall(); @@ -290,7 +290,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); - expectOffsetFlush(1L, null, new Exception(), 0, true); + expectOffsetCommit(1L, null, new Exception(), 0, true); expectStopTask(); PowerMock.replayAll(); @@ -322,7 +322,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { // Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 2); - expectOffsetFlush(2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false); + expectOffsetCommit(2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false); expectStopTask(); PowerMock.replayAll(); @@ -633,11 +633,11 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { return EasyMock.expectLastCall(); } - private Capture<OffsetCommitCallback> expectOffsetFlush(final long expectedMessages, - final RuntimeException flushError, - final Exception consumerCommitError, - final long consumerCommitDelayMs, - final boolean invokeCallback) + private Capture<OffsetCommitCallback> expectOffsetCommit(final long expectedMessages, + final RuntimeException error, + final Exception consumerCommitError, + final long consumerCommitDelayMs, + final boolean invokeCallback) throws Exception { final long finalOffset = FIRST_OFFSET + expectedMessages; @@ -646,11 +646,13 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { offsetsToCommit.put(TOPIC_PARTITION, new OffsetAndMetadata(finalOffset)); offsetsToCommit.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); offsetsToCommit.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET)); - sinkTask.flush(offsetsToCommit); - IExpectationSetters<Object> flushExpectation = PowerMock.expectLastCall(); - if (flushError != null) { - flushExpectation.andThrow(flushError).once(); + sinkTask.preCommit(offsetsToCommit); + IExpectationSetters<Object> expectation = PowerMock.expectLastCall(); + if (error != null) { + expectation.andThrow(error).once(); return null; + } else { + expectation.andReturn(offsetsToCommit); } final Capture<OffsetCommitCallback> capturedCallback = EasyMock.newCapture();