Repository: kafka Updated Branches: refs/heads/trunk e8343e67e -> 1d80f563b
KAFKA-3092: Replace SinkTask onPartitionsAssigned/onPartitionsRevoked with open/close Author: Jason Gustafson <[email protected]> Reviewers: Liquan Pei <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #815 from hachikuji/KAFKA-3092 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d80f563 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d80f563 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d80f563 Branch: refs/heads/trunk Commit: 1d80f563bcd043cd464003782802906b60a0ade8 Parents: e8343e6 Author: Jason Gustafson <[email protected]> Authored: Wed Feb 3 11:28:58 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Wed Feb 3 11:28:58 2016 -0800 ---------------------------------------------------------------------- .../org/apache/kafka/connect/sink/SinkTask.java | 64 ++++- .../apache/kafka/connect/runtime/Worker.java | 22 +- .../kafka/connect/runtime/WorkerSinkTask.java | 257 +++++++++++-------- .../connect/runtime/WorkerSinkTaskThread.java | 112 -------- .../kafka/connect/runtime/WorkerSourceTask.java | 152 +++++------ .../kafka/connect/runtime/WorkerTask.java | 81 +++++- .../connect/runtime/WorkerSinkTaskTest.java | 66 ++--- .../runtime/WorkerSinkTaskThreadedTest.java | 196 ++++++++------ .../connect/runtime/WorkerSourceTaskTest.java | 15 +- .../kafka/connect/runtime/WorkerTest.java | 6 +- 10 files changed, 512 insertions(+), 459 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1d80f563/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java index 85ce88a..3d0becc 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java @@ -25,9 +25,32 @@ import java.util.Collection; import java.util.Map; /** - * SinkTask is a Task takes records loaded from Kafka and sends them to another system. In - * addition to the basic {@link #put} interface, SinkTasks must also implement {@link #flush} - * to support offset commits. + * SinkTask is a Task that takes records loaded from Kafka and sends them to another system. Each task + * instance is assigned a set of partitions by the Connect framework and will handle all records received + * from those partitions. As records are fetched from Kafka, they will be passed to the sink task using the + * {@link #put(Collection)} API, which should either write them to the downstream system or batch them for + * later writing. Periodically, Connect will call {@link #flush(Map)} to ensure that batched records are + * actually pushed to the downstream system.. + * + * Below we describe the lifecycle of a SinkTask. + * + * <ol> + * <li><b>Initialization:</b> SinkTasks are first initialized using {@link #initialize(SinkTaskContext)} + * to prepare the task's context and {@link #start(Map)} to accept configuration and start any services + * needed for processing.</li> + * <li><b>Partition Assignment:</b> After initialization, Connect will assign the task a set of partitions + * using {@link #open(Collection)}. These partitions are owned exclusively by this task until they + * have been closed with {@link #close(Collection)}.</li> + * <li><b>Record Processing:</b> Once partitions have been opened for writing, Connect will begin forwarding + * records from Kafka using the {@link #put(Collection)} API. Periodically, Connect will ask the task + * to flush records using {@link #flush(Map)} as described above.</li> + * <li><b>Partition Rebalancing:</b> Occasionally, Connect will need to change the assignment of this task. + * When this happens, the currently assigned partitions will be closed with {@link #close(Collection)} and + * the new assignment will be opened using {@link #open(Collection)}.</li> + * <li><b>Shutdown:</b> When the task needs to be shutdown, Connect will close active partitions (if there + * are any) and stop the task using {@link #stop()}</li> + * </ol> + * */ @InterfaceStability.Unstable public abstract class SinkTask implements Task { @@ -42,6 +65,11 @@ public abstract class SinkTask implements Task { protected SinkTaskContext context; + /** + * Initialize the context of this task. Note that the partition assignment will be empty until + * Connect has opened the partitions for writing with {@link #open(Collection)}. + * @param context The sink task's context + */ public void initialize(SinkTaskContext context) { this.context = context; } @@ -77,24 +105,38 @@ public abstract class SinkTask implements Task { /** * The SinkTask use this method to create writers for newly assigned partitions in case of partition - * re-assignment. In partition re-assignment, some new partitions may be assigned to the SinkTask. - * The SinkTask needs to create writers and perform necessary recovery for the newly assigned partitions. - * This method will be called after partition re-assignment completes and before the SinkTask starts + * rebalance. This method will be called after partition re-assignment completes and before the SinkTask starts * fetching data. Note that any errors raised from this method will cause the task to stop. * @param partitions The list of partitions that are now assigned to the task (may include * partitions previously assigned to the task) */ + public void open(Collection<TopicPartition> partitions) { + this.onPartitionsAssigned(partitions); + } + + /** + * @deprecated Use {@link #open(Collection)} for partition initialization. + */ + @Deprecated public void onPartitionsAssigned(Collection<TopicPartition> partitions) { } /** - * The SinkTask use this method to close writers and commit offsets for partitions that are no + * The SinkTask use this method to close writers for partitions that are no * longer assigned to the SinkTask. This method will be called before a rebalance operation starts - * and after the SinkTask stops fetching data. Note that any errors raised from this method will cause - * the task to stop. - * @param partitions The list of partitions that were assigned to the consumer on the last - * rebalance + * and after the SinkTask stops fetching data. After being closed, Connect will not write + * any records to the task until a new set of partitions has been opened. Note that any errors raised + * from this method will cause the task to stop. + * @param partitions The list of partitions that should be closed + */ + public void close(Collection<TopicPartition> partitions) { + this.onPartitionsRevoked(partitions); + } + + /** + * @deprecated Use {@link #close(Collection)} instead for partition cleanup. */ + @Deprecated public void onPartitionsRevoked(Collection<TopicPartition> partitions) { } http://git-wip-us.apache.org/repos/asf/kafka/blob/1d80f563/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 88b4c10..0a4bb7f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -17,11 +17,11 @@ package org.apache.kafka.connect.runtime; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; @@ -33,8 +33,8 @@ import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.storage.OffsetStorageReader; -import org.apache.kafka.connect.storage.OffsetStorageWriter; import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; +import org.apache.kafka.connect.storage.OffsetStorageWriter; import org.apache.kafka.connect.util.ConnectorTaskId; import org.reflections.Reflections; import org.reflections.util.ClasspathHelper; @@ -48,6 +48,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** @@ -62,8 +64,10 @@ import java.util.Set; public class Worker { private static final Logger log = LoggerFactory.getLogger(Worker.class); - private Time time; - private WorkerConfig config; + private final ExecutorService executor; + private final Time time; + private final WorkerConfig config; + private Converter keyConverter; private Converter valueConverter; private Converter internalKeyConverter; @@ -80,6 +84,7 @@ public class Worker { @SuppressWarnings("unchecked") public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore) { + this.executor = Executors.newCachedThreadPool(); this.time = time; this.config = config; this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class); @@ -154,7 +159,6 @@ public class Worker { log.debug("Waiting for task {} to finish shutting down", task); if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0))) log.error("Graceful shutdown of task {} failed.", task); - task.close(); } long timeoutMs = limit - time.milliseconds(); @@ -342,7 +346,9 @@ public class Worker { // Start the task before adding modifying any state, any exceptions are caught higher up the // call chain and there's no cleanup to do here - workerTask.start(taskConfig.originalsStrings()); + workerTask.initialize(taskConfig.originalsStrings()); + executor.submit(workerTask); + if (task instanceof SourceTask) { WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask; sourceTaskOffsetCommitter.schedule(id, workerSourceTask); @@ -367,7 +373,6 @@ public class Worker { task.stop(); if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG))) log.error("Graceful stop of task {} failed.", task); - task.close(); tasks.remove(id); } @@ -394,4 +399,5 @@ public class Worker { public Converter getInternalValueConverter() { return internalValueConverter; } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/1d80f563/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index f48a734..8c5bd9f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -48,119 +48,155 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; /** * WorkerTask that uses a SinkTask to export data from Kafka. */ -class WorkerSinkTask implements WorkerTask { +class WorkerSinkTask extends WorkerTask { private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class); - private final ConnectorTaskId id; - private final SinkTask task; private final WorkerConfig workerConfig; + private final SinkTask task; + private Map<String, String> taskConfig; private final Time time; private final Converter keyConverter; private final Converter valueConverter; - private WorkerSinkTaskThread workThread; - private Map<String, String> taskProps; private KafkaConsumer<byte[], byte[]> consumer; private WorkerSinkTaskContext context; - private boolean started; private final List<SinkRecord> messageBatch; private Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets; private Map<TopicPartition, OffsetAndMetadata> currentOffsets; - private boolean pausedForRedelivery; private RuntimeException rebalanceException; + private long nextCommit; + private int commitSeqno; + private long commitStarted; + private int commitFailures; + private boolean pausedForRedelivery; + private boolean committing; + + public WorkerSinkTask(ConnectorTaskId id, + SinkTask task, + WorkerConfig workerConfig, + Converter keyConverter, + Converter valueConverter, + Time time) { + super(id); - public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig, - Converter keyConverter, Converter valueConverter, Time time) { - this.id = id; - this.task = task; this.workerConfig = workerConfig; + this.task = task; this.keyConverter = keyConverter; this.valueConverter = valueConverter; this.time = time; - this.started = false; this.messageBatch = new ArrayList<>(); this.currentOffsets = new HashMap<>(); this.pausedForRedelivery = false; this.rebalanceException = null; + this.nextCommit = time.milliseconds() + + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); + this.committing = false; + this.commitSeqno = 0; + this.commitStarted = -1; + this.commitFailures = 0; } @Override - public void start(Map<String, String> props) { - taskProps = props; - consumer = createConsumer(); - context = new WorkerSinkTaskContext(consumer); - - workThread = createWorkerThread(); - workThread.start(); + public void initialize(Map<String, String> taskConfig) { + this.taskConfig = taskConfig; + this.consumer = createConsumer(); + this.context = new WorkerSinkTaskContext(consumer); } @Override public void stop() { // Offset commit is handled upon exit in work thread - if (workThread != null) - workThread.startGracefulShutdown(); + super.stop(); consumer.wakeup(); } @Override - public boolean awaitStop(long timeoutMs) { - boolean success = true; - if (workThread != null) { - try { - success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS); - if (!success) - workThread.forceShutdown(); - } catch (InterruptedException e) { - success = false; - } - } - task.stop(); - return success; - } - - @Override - public void close() { + protected void close() { // FIXME Kafka needs to add a timeout parameter here for us to properly obey the timeout // passed in + task.stop(); if (consumer != null) consumer.close(); } + @Override + public void execute() { + initializeAndStart(); + try { + while (!isStopping()) + iteration(); + } finally { + // Make sure any uncommitted data has been committed and the task has + // a chance to clean up its state + closePartitions(); + } + } + + protected void iteration() { + long now = time.milliseconds(); + + // Maybe commit + if (!committing && now >= nextCommit) { + commitOffsets(now, false); + nextCommit += workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); + } + + // Check for timed out commits + long commitTimeout = commitStarted + workerConfig.getLong( + WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); + if (committing && now >= commitTimeout) { + log.warn("Commit of {} offsets timed out", this); + commitFailures++; + committing = false; + } + + // And process messages + long timeoutMs = Math.max(nextCommit - now, 0); + poll(timeoutMs); + } + + private void onCommitCompleted(Throwable error, long seqno) { + if (commitSeqno != seqno) { + log.debug("Got callback for timed out commit {}: {}, but most recent commit is {}", + this, + seqno, commitSeqno); + } else { + if (error != null) { + log.error("Commit of {} offsets threw an unexpected exception: ", this, error); + commitFailures++; + } else { + log.debug("Finished {} offset commit successfully in {} ms", + this, time.milliseconds() - commitStarted); + commitFailures = 0; + } + committing = false; + } + } + + public int commitFailures() { + return commitFailures; + } + /** - * Performs initial join process for consumer group, ensures we have an assignment, and initializes + starts the - * SinkTask. - * - * @returns true if successful, false if joining the consumer group was interrupted + * Initializes and starts the SinkTask. */ - public boolean joinConsumerGroupAndStart() { - String topicsStr = taskProps.get(SinkTask.TOPICS_CONFIG); + protected void initializeAndStart() { + String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG); if (topicsStr == null || topicsStr.isEmpty()) throw new ConnectException("Sink tasks require a list of topics."); String[] topics = topicsStr.split(","); log.debug("Task {} subscribing to topics {}", id, topics); consumer.subscribe(Arrays.asList(topics), new HandleRebalance()); - - // Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions - // to work with. Any rewinding will be handled immediately when polling starts. - try { - pollConsumer(0); - } catch (WakeupException e) { - log.error("Sink task {} was stopped before completing join group. Task initialization and start is being skipped", this); - return false; - } task.initialize(context); - task.start(taskProps); + task.start(taskConfig); log.info("Sink task {} finished initialization and start", this); - started = true; - return true; } /** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */ - public void poll(long timeoutMs) { + protected void poll(long timeoutMs) { try { rewind(); long retryTimeout = context.timeout(); @@ -183,55 +219,62 @@ class WorkerSinkTask implements WorkerTask { /** * Starts an offset commit by flushing outstanding messages from the task and then starting - * the write commit. This should only be invoked by the WorkerSinkTaskThread. + * the write commit. **/ - public void commitOffsets(boolean sync, final int seqno) { + private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean closing, final int seqno) { log.info("{} Committing offsets", this); - - final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(currentOffsets); - - try { - task.flush(offsets); - } catch (Throwable t) { - log.error("Commit of {} offsets failed due to exception while flushing:", this, t); - log.error("Rewinding offsets to last committed offsets"); - for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsets.entrySet()) { - log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset()); - consumer.seek(entry.getKey(), entry.getValue().offset()); - } - currentOffsets = new HashMap<>(lastCommittedOffsets); - workThread.onCommitCompleted(t, seqno); - return; - } - - if (sync) { + if (closing) { try { consumer.commitSync(offsets); lastCommittedOffsets = offsets; - workThread.onCommitCompleted(null, seqno); + onCommitCompleted(null, seqno); } catch (KafkaException e) { - workThread.onCommitCompleted(e, seqno); + onCommitCompleted(e, seqno); } } else { OffsetCommitCallback cb = new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception error) { lastCommittedOffsets = offsets; - workThread.onCommitCompleted(error, seqno); + onCommitCompleted(error, seqno); } }; consumer.commitAsync(offsets, cb); } } - public Time time() { - return time; - } + private void commitOffsets(long now, boolean closing) { + if (currentOffsets.isEmpty()) + return; - public WorkerConfig workerConfig() { - return workerConfig; + committing = true; + commitSeqno += 1; + commitStarted = now; + + Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(currentOffsets); + try { + task.flush(offsets); + } catch (Throwable t) { + log.error("Commit of {} offsets failed due to exception while flushing:", this, t); + log.error("Rewinding offsets to last committed offsets"); + for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsets.entrySet()) { + log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset()); + consumer.seek(entry.getKey(), entry.getValue().offset()); + } + currentOffsets = new HashMap<>(lastCommittedOffsets); + onCommitCompleted(t, commitSeqno); + return; + } finally { + // Close the task if needed before committing the offsets. This is basically the last chance for + // the connector to actually flush data that has been written to it. + if (closing) + task.close(currentOffsets.keySet()); + } + + doCommit(offsets, closing, commitSeqno); } + @Override public String toString() { return "WorkerSinkTask{" + @@ -277,10 +320,6 @@ class WorkerSinkTask implements WorkerTask { return newConsumer; } - private WorkerSinkTaskThread createWorkerThread() { - return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig); - } - private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) { for (ConsumerRecord<byte[], byte[]> msg : msgs) { log.trace("Consuming message with key {}, value {}", msg.key(), msg.value()); @@ -321,8 +360,8 @@ class WorkerSinkTask implements WorkerTask { consumer.pause(tp); // Let this exit normally, the batch will be reprocessed on the next loop. } catch (Throwable t) { - log.error("Task {} threw an uncaught and unrecoverable exception", id); - log.error("Task is being killed and will not recover until manually restarted:", t); + log.error("Task {} threw an uncaught and unrecoverable exception", id, t); + log.error("Task is being killed and will not recover until manually restarted"); throw new ConnectException("Exiting WorkerSinkTask due to unrecoverable exception."); } } @@ -344,12 +383,20 @@ class WorkerSinkTask implements WorkerTask { context.clearOffsets(); } + private void openPartitions(Collection<TopicPartition> partitions) { + if (partitions.isEmpty()) + return; + + task.open(partitions); + } + + private void closePartitions() { + commitOffsets(time.milliseconds(), true); + } + private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { - if (rebalanceException != null) - return; - lastCommittedOffsets = new HashMap<>(); currentOffsets = new HashMap<>(); for (TopicPartition tp : partitions) { @@ -364,6 +411,7 @@ class WorkerSinkTask implements WorkerTask { // Also make sure our tracking of paused partitions is updated to remove any partitions we no longer own. if (pausedForRedelivery) { pausedForRedelivery = false; + Set<TopicPartition> assigned = new HashSet<>(partitions); Set<TopicPartition> taskPaused = context.pausedPartitions(); @@ -383,9 +431,9 @@ class WorkerSinkTask implements WorkerTask { // Instead of invoking the assignment callback on initialization, we guarantee the consumer is ready upon // task start. Since this callback gets invoked during that initial setup before we've started the task, we // need to guard against invoking the user's callback method during that period. - if (started) { + if (rebalanceException == null) { try { - task.onPartitionsAssigned(partitions); + openPartitions(partitions); } catch (RuntimeException e) { // The consumer swallows exceptions raised in the rebalance listener, so we need to store // exceptions and rethrow when poll() returns. @@ -396,15 +444,12 @@ class WorkerSinkTask implements WorkerTask { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { - if (started) { - try { - task.onPartitionsRevoked(partitions); - commitOffsets(true, -1); - } catch (RuntimeException e) { - // The consumer swallows exceptions raised in the rebalance listener, so we need to store - // exceptions and rethrow when poll() returns. - rebalanceException = e; - } + try { + closePartitions(); + } catch (RuntimeException e) { + // The consumer swallows exceptions raised in the rebalance listener, so we need to store + // exceptions and rethrow when poll() returns. + rebalanceException = e; } // Make sure we don't have any leftover data since offsets will be reset to committed positions http://git-wip-us.apache.org/repos/asf/kafka/blob/1d80f563/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java deleted file mode 100644 index 93e210a..0000000 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.connect.runtime; - -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.connect.util.ShutdownableThread; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Worker thread for a WorkerSinkTask. These classes are very tightly coupled, but separated to - * simplify testing. - */ -class WorkerSinkTaskThread extends ShutdownableThread { - private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class); - - private final WorkerSinkTask task; - private long nextCommit; - private boolean committing; - private int commitSeqno; - private long commitStarted; - private int commitFailures; - - public WorkerSinkTaskThread(WorkerSinkTask task, String name, Time time, - WorkerConfig workerConfig) { - super(name); - this.task = task; - this.nextCommit = time.milliseconds() + - workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); - this.committing = false; - this.commitSeqno = 0; - this.commitStarted = -1; - this.commitFailures = 0; - } - - @Override - public void execute() { - // Try to join and start. If we're interrupted before this completes, bail. - if (!task.joinConsumerGroupAndStart()) - return; - - while (getRunning()) { - iteration(); - } - - // Make sure any uncommitted data has committed - task.commitOffsets(true, -1); - } - - public void iteration() { - long now = task.time().milliseconds(); - - // Maybe commit - if (!committing && now >= nextCommit) { - committing = true; - commitSeqno += 1; - commitStarted = now; - task.commitOffsets(false, commitSeqno); - nextCommit += task.workerConfig().getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); - } - - // Check for timed out commits - long commitTimeout = commitStarted + task.workerConfig().getLong( - WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); - if (committing && now >= commitTimeout) { - log.warn("Commit of {} offsets timed out", task); - commitFailures++; - committing = false; - } - - // And process messages - long timeoutMs = Math.max(nextCommit - now, 0); - task.poll(timeoutMs); - } - - public void onCommitCompleted(Throwable error, long seqno) { - if (commitSeqno != seqno) { - log.debug("Got callback for timed out commit {}: {}, but most recent commit is {}", - this, - seqno, commitSeqno); - } else { - if (error != null) { - log.error("Commit of {} offsets threw an unexpected exception: ", task, error); - commitFailures++; - } else { - log.debug("Finished {} offset commit successfully in {} ms", - task, task.time().milliseconds() - commitStarted); - commitFailures = 0; - } - committing = false; - } - } - - public int commitFailures() { - return commitFailures; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/1d80f563/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 6c61d79..30c2262 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -31,7 +31,6 @@ import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageWriter; import org.apache.kafka.connect.util.ConnectorTaskId; -import org.apache.kafka.connect.util.ShutdownableThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,20 +46,18 @@ import java.util.concurrent.TimeoutException; /** * WorkerTask that uses a SourceTask to ingest data into Kafka. */ -class WorkerSourceTask implements WorkerTask { +class WorkerSourceTask extends WorkerTask { private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class); private static final long SEND_FAILED_BACKOFF_MS = 100; - private final ConnectorTaskId id; + private final WorkerConfig workerConfig; private final SourceTask task; private final Converter keyConverter; private final Converter valueConverter; private KafkaProducer<byte[], byte[]> producer; - private WorkerSourceTaskThread workThread; private final OffsetStorageReader offsetReader; private final OffsetStorageWriter offsetWriter; - private final WorkerConfig workerConfig; private final Time time; private List<SourceRecord> toSend; @@ -73,19 +70,28 @@ class WorkerSourceTask implements WorkerTask { private boolean flushing; private CountDownLatch stopRequestedLatch; - public WorkerSourceTask(ConnectorTaskId id, SourceTask task, - Converter keyConverter, Converter valueConverter, + private Map<String, String> taskConfig; + private boolean finishedStart = false; + private boolean startedShutdownBeforeStartCompleted = false; + + public WorkerSourceTask(ConnectorTaskId id, + SourceTask task, + Converter keyConverter, + Converter valueConverter, KafkaProducer<byte[], byte[]> producer, - OffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter, - WorkerConfig workerConfig, Time time) { - this.id = id; + OffsetStorageReader offsetReader, + OffsetStorageWriter offsetWriter, + WorkerConfig workerConfig, + Time time) { + super(id); + + this.workerConfig = workerConfig; this.task = task; this.keyConverter = keyConverter; this.valueConverter = valueConverter; this.producer = producer; this.offsetReader = offsetReader; this.offsetWriter = offsetWriter; - this.workerConfig = workerConfig; this.time = time; this.toSend = null; @@ -97,37 +103,60 @@ class WorkerSourceTask implements WorkerTask { } @Override - public void start(Map<String, String> props) { - workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id, props); - workThread.start(); + public void initialize(Map<String, String> config) { + this.taskConfig = config; + } + + protected void close() { + // nothing to do } @Override public void stop() { - if (workThread != null) { - workThread.startGracefulShutdown(); - stopRequestedLatch.countDown(); + super.stop(); + stopRequestedLatch.countDown(); + synchronized (this) { + if (finishedStart) + task.stop(); + else + startedShutdownBeforeStartCompleted = true; } } @Override - public boolean awaitStop(long timeoutMs) { - boolean success = true; - if (workThread != null) { - try { - success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS); - if (!success) - workThread.forceShutdown(); - } catch (InterruptedException e) { - success = false; + public void execute() { + try { + task.initialize(new WorkerSourceTaskContext(offsetReader)); + task.start(taskConfig); + log.info("Source task {} finished initialization and start", this); + synchronized (this) { + if (startedShutdownBeforeStartCompleted) { + task.stop(); + return; + } + finishedStart = true; } + + while (!isStopping()) { + if (toSend == null) + toSend = task.poll(); + if (toSend == null) + continue; + if (!sendRecords()) + stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS); + } + } catch (InterruptedException e) { + // Ignore and allow to exit. + } catch (Throwable t) { + log.error("Task {} threw an uncaught and unrecoverable exception", id); + log.error("Task is being killed and will not recover until manually restarted:", t); + // It should still be safe to let this fall through and commit offsets since this exception would have + // simply resulted in not getting more records but all the existing records should be ok to flush + // and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit + // to fail. } - return success; - } - @Override - public void close() { - // Nothing to do + commitOffsets(); } /** @@ -323,67 +352,6 @@ class WorkerSourceTask implements WorkerTask { flushing = false; } - - private class WorkerSourceTaskThread extends ShutdownableThread { - private Map<String, String> workerProps; - private boolean finishedStart; - private boolean startedShutdownBeforeStartCompleted; - - public WorkerSourceTaskThread(String name, Map<String, String> workerProps) { - super(name); - this.workerProps = workerProps; - this.finishedStart = false; - this.startedShutdownBeforeStartCompleted = false; - } - - @Override - public void execute() { - try { - task.initialize(new WorkerSourceTaskContext(offsetReader)); - task.start(workerProps); - log.info("Source task {} finished initialization and start", this); - synchronized (this) { - if (startedShutdownBeforeStartCompleted) { - task.stop(); - return; - } - finishedStart = true; - } - - while (getRunning()) { - if (toSend == null) - toSend = task.poll(); - if (toSend == null) - continue; - if (!sendRecords()) - stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS); - } - } catch (InterruptedException e) { - // Ignore and allow to exit. - } catch (Throwable t) { - log.error("Task {} threw an uncaught and unrecoverable exception", id); - log.error("Task is being killed and will not recover until manually restarted:", t); - // It should still be safe to let this fall through and commit offsets since this exception would have - // simply resulted in not getting more records but all the existing records should be ok to flush - // and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit - // to fail. - } - - commitOffsets(); - } - - @Override - public void startGracefulShutdown() { - super.startGracefulShutdown(); - synchronized (this) { - if (finishedStart) - task.stop(); - else - startedShutdownBeforeStartCompleted = true; - } - } - } - @Override public String toString() { return "WorkerSourceTask{" + http://git-wip-us.apache.org/repos/asf/kafka/blob/1d80f563/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index 66fc45b..b4d427a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -17,25 +17,48 @@ package org.apache.kafka.connect.runtime; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Handles processing for an individual task. This interface only provides the basic methods * used by {@link Worker} to manage the tasks. Implementations combine a user-specified Task with * Kafka to create a data flow. */ -interface WorkerTask { +abstract class WorkerTask implements Runnable { + private static final Logger log = LoggerFactory.getLogger(WorkerTask.class); + + protected final ConnectorTaskId id; + private final AtomicBoolean stopping; + private final AtomicBoolean running; + private final CountDownLatch shutdownLatch; + + public WorkerTask(ConnectorTaskId id) { + this.id = id; + this.stopping = new AtomicBoolean(false); + this.running = new AtomicBoolean(false); + this.shutdownLatch = new CountDownLatch(1); + } + /** - * Start the Task + * Initialize the task for execution. * @param props initial configuration */ - void start(Map<String, String> props); + public abstract void initialize(Map<String, String> props); /** * Stop this task from processing messages. This method does not block, it only triggers * shutdown. Use #{@link #awaitStop} to block until completion. */ - void stop(); + public void stop() { + this.stopping.set(true); + } /** * Wait for this task to finish stopping. @@ -43,12 +66,48 @@ interface WorkerTask { * @param timeoutMs * @return true if successful, false if the timeout was reached */ - boolean awaitStop(long timeoutMs); + public boolean awaitStop(long timeoutMs) { + if (!running.get()) + return true; + + try { + return shutdownLatch.await(timeoutMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + return false; + } + } + + protected abstract void execute(); + + protected abstract void close(); + + protected boolean isStopping() { + return stopping.get(); + } + + private void doClose() { + try { + close(); + } catch (Throwable t) { + log.error("Unhandled exception in task shutdown {}", id, t); + } finally { + running.set(false); + shutdownLatch.countDown(); + } + } + + @Override + public void run() { + if (!this.running.compareAndSet(false, true)) + throw new IllegalStateException("The task cannot be started while still running"); + + try { + execute(); + } catch (Throwable t) { + log.error("Unhandled exception in task {}", id, t); + } finally { + doClose(); + } + } - /** - * Close this task. This is different from #{@link #stop} and #{@link #awaitStop} in that the - * stop methods ensure processing has stopped but may leave resources allocated. This method - * should clean up all resources. - */ - void close(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/1d80f563/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 305a61e..04b08b3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -97,8 +97,6 @@ public class WorkerSinkTaskTest { @Mock private Converter valueConverter; @Mock - private WorkerSinkTaskThread workerThread; - @Mock private KafkaConsumer<byte[], byte[]> consumer; private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture(); @@ -116,7 +114,7 @@ public class WorkerSinkTaskTest { workerProps.put("internal.value.converter.schemas.enable", "false"); workerConfig = new StandaloneConfig(workerProps); workerTask = PowerMock.createPartialMock( - WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"}, + WorkerSinkTask.class, new String[]{"createConsumer"}, taskId, sinkTask, workerConfig, keyConverter, valueConverter, time); recordsReturned = 0; @@ -125,6 +123,7 @@ public class WorkerSinkTaskTest { @Test public void testPollRedelivery() throws Exception { expectInitializeTask(); + expectPollInitialAssignment(); // If a retriable exception is thrown, we should redeliver the same batch, pausing the consumer in the meantime expectConsumerPoll(1); @@ -152,8 +151,9 @@ public class WorkerSinkTaskTest { PowerMock.replayAll(); - workerTask.start(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); + workerTask.initialize(TASK_PROPS); + workerTask.initializeAndStart(); + workerTask.poll(Long.MAX_VALUE); workerTask.poll(Long.MAX_VALUE); workerTask.poll(Long.MAX_VALUE); @@ -165,12 +165,14 @@ public class WorkerSinkTaskTest { RuntimeException exception = new RuntimeException("Revocation error"); expectInitializeTask(); + expectPollInitialAssignment(); expectRebalanceRevocationError(exception); PowerMock.replayAll(); - workerTask.start(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); + workerTask.initialize(TASK_PROPS); + workerTask.initializeAndStart(); + workerTask.poll(Long.MAX_VALUE); try { workerTask.poll(Long.MAX_VALUE); fail("Poll should have raised the rebalance exception"); @@ -186,12 +188,14 @@ public class WorkerSinkTaskTest { RuntimeException exception = new RuntimeException("Assignment error"); expectInitializeTask(); + expectPollInitialAssignment(); expectRebalanceAssignmentError(exception); PowerMock.replayAll(); - workerTask.start(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); + workerTask.initialize(TASK_PROPS); + workerTask.initializeAndStart(); + workerTask.poll(Long.MAX_VALUE); try { workerTask.poll(Long.MAX_VALUE); fail("Poll should have raised the rebalance exception"); @@ -205,24 +209,9 @@ public class WorkerSinkTaskTest { private void expectInitializeTask() throws Exception { PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer); - PowerMock.expectPrivate(workerTask, "createWorkerThread") - .andReturn(workerThread); - workerThread.start(); - PowerMock.expectLastCall(); - consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener)); PowerMock.expectLastCall(); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() { - @Override - public ConsumerRecords<byte[], byte[]> answer() throws Throwable { - rebalanceListener.getValue().onPartitionsAssigned(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); - return ConsumerRecords.empty(); - } - }); - EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); - EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); - sinkTask.initialize(EasyMock.capture(sinkTaskContext)); PowerMock.expectLastCall(); sinkTask.start(TASK_PROPS); @@ -232,7 +221,7 @@ public class WorkerSinkTaskTest { private void expectRebalanceRevocationError(RuntimeException e) { final List<TopicPartition> partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2); - sinkTask.onPartitionsRevoked(partitions); + sinkTask.close(new HashSet<>(partitions)); EasyMock.expectLastCall().andThrow(e); EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( @@ -248,7 +237,7 @@ public class WorkerSinkTaskTest { private void expectRebalanceAssignmentError(RuntimeException e) { final List<TopicPartition> partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2); - sinkTask.onPartitionsRevoked(partitions); + sinkTask.close(new HashSet<>(partitions)); EasyMock.expectLastCall(); sinkTask.flush(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject()); @@ -257,13 +246,10 @@ public class WorkerSinkTaskTest { consumer.commitSync(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject()); EasyMock.expectLastCall(); - workerThread.onCommitCompleted(EasyMock.<Throwable>isNull(), EasyMock.anyLong()); - EasyMock.expectLastCall(); - EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); - sinkTask.onPartitionsAssigned(partitions); + sinkTask.open(partitions); EasyMock.expectLastCall().andThrow(e); EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( @@ -277,6 +263,26 @@ public class WorkerSinkTaskTest { }); } + private void expectPollInitialAssignment() { + final List<TopicPartition> partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2); + + sinkTask.open(partitions); + EasyMock.expectLastCall(); + + EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() { + @Override + public ConsumerRecords<byte[], byte[]> answer() throws Throwable { + rebalanceListener.getValue().onPartitionsAssigned(partitions); + return ConsumerRecords.empty(); + } + }); + EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); + + sinkTask.put(Collections.<SinkRecord>emptyList()); + EasyMock.expectLastCall(); + } + private void expectConsumerPoll(final int numMessages) { EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( new IAnswer<ConsumerRecords<byte[], byte[]>>() { http://git-wip-us.apache.org/repos/asf/kafka/blob/1d80f563/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index 6915631..3bf653e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -55,8 +55,8 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -100,7 +100,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { private Converter valueConverter; private WorkerSinkTask workerTask; @Mock private KafkaConsumer<byte[], byte[]> consumer; - private WorkerSinkTaskThread workerThread; private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture(); private long recordsReturned; @@ -119,7 +118,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { workerProps.put("internal.value.converter.schemas.enable", "false"); workerConfig = new StandaloneConfig(workerProps); workerTask = PowerMock.createPartialMock( - WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"}, + WorkerSinkTask.class, new String[]{"createConsumer"}, taskId, sinkTask, workerConfig, keyConverter, valueConverter, time); recordsReturned = 0; @@ -128,16 +127,22 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { @Test public void testPollsInBackground() throws Exception { expectInitializeTask(); + expectPollInitialAssignment(); + Capture<Collection<SinkRecord>> capturedRecords = expectPolls(1L); expectStopTask(10L); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); + workerTask.initialize(TASK_PROPS); + workerTask.initializeAndStart(); + + // First iteration initializes partition assignment + workerTask.iteration(); + + // Then we iterate to fetch data for (int i = 0; i < 10; i++) { - workerThread.iteration(); + workerTask.iteration(); } workerTask.stop(); workerTask.awaitStop(Long.MAX_VALUE); @@ -163,23 +168,28 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { @Test public void testCommit() throws Exception { expectInitializeTask(); + expectPollInitialAssignment(); + // Make each poll() take the offset commit interval Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); expectOffsetFlush(1L, null, null, 0, true); expectStopTask(2); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - // First iteration gets one record - workerThread.iteration(); - // Second triggers commit, gets a second offset - workerThread.iteration(); + workerTask.initialize(TASK_PROPS); + workerTask.initializeAndStart(); + + // Initialize partition assignment + workerTask.iteration(); + // Fetch one record + workerTask.iteration(); + // Trigger the commit + workerTask.iteration(); + // Commit finishes synchronously for testing so we can check this immediately - assertEquals(0, workerThread.commitFailures()); + assertEquals(0, workerTask.commitFailures()); workerTask.stop(); workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); @@ -192,6 +202,8 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { @Test public void testCommitTaskFlushFailure() throws Exception { expectInitializeTask(); + expectPollInitialAssignment(); + Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); expectOffsetFlush(1L, new RuntimeException(), null, 0, true); // Should rewind to last known good positions, which in this case will be the offsets loaded during initialization @@ -203,17 +215,21 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET); PowerMock.expectLastCall(); expectStopTask(2); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - // Second iteration triggers commit - workerThread.iteration(); - workerThread.iteration(); - assertEquals(1, workerThread.commitFailures()); - assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); + workerTask.initialize(TASK_PROPS); + workerTask.initializeAndStart(); + + // Initialize partition assignment + workerTask.iteration(); + // Fetch some data + workerTask.iteration(); + // Trigger the commit + workerTask.iteration(); + + assertEquals(1, workerTask.commitFailures()); + assertEquals(false, Whitebox.getInternalState(workerTask, "committing")); workerTask.stop(); workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); @@ -226,6 +242,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { // Validate that we rewind to the correct offsets if a task's flush method throws an exception expectInitializeTask(); + expectPollInitialAssignment(); Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); expectOffsetFlush(1L, null, null, 0, true); expectOffsetFlush(2L, new RuntimeException(), null, 0, true); @@ -237,18 +254,23 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET); PowerMock.expectLastCall(); expectStopTask(2); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - // Second iteration triggers first commit, third iteration triggers second (failing) commit - workerThread.iteration(); - workerThread.iteration(); - workerThread.iteration(); - assertEquals(1, workerThread.commitFailures()); - assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); + workerTask.initialize(TASK_PROPS); + workerTask.initializeAndStart(); + + // Initialize partition assignment + workerTask.iteration(); + // Fetch some data + workerTask.iteration(); + // Trigger first commit, + workerTask.iteration(); + // Trigger second (failing) commit + workerTask.iteration(); + + assertEquals(1, workerTask.commitFailures()); + assertEquals(false, Whitebox.getInternalState(workerTask, "committing")); workerTask.stop(); workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); @@ -259,22 +281,28 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { @Test public void testCommitConsumerFailure() throws Exception { expectInitializeTask(); + expectPollInitialAssignment(); + Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); expectOffsetFlush(1L, null, new Exception(), 0, true); expectStopTask(2); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - // Second iteration triggers commit - workerThread.iteration(); - workerThread.iteration(); + workerTask.initialize(TASK_PROPS); + workerTask.initializeAndStart(); + + // Initialize partition assignment + workerTask.iteration(); + // Fetch some data + workerTask.iteration(); + // Trigger commit + workerTask.iteration(); + // TODO Response to consistent failures? - assertEquals(1, workerThread.commitFailures()); - assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); + assertEquals(1, workerTask.commitFailures()); + assertEquals(false, Whitebox.getInternalState(workerTask, "committing")); workerTask.stop(); workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); @@ -285,26 +313,32 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { @Test public void testCommitTimeout() throws Exception { expectInitializeTask(); + expectPollInitialAssignment(); + // Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 2); expectOffsetFlush(2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false); expectStopTask(4); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - // Third iteration triggers commit, fourth gives a chance to trigger the timeout but doesn't - // trigger another commit - workerThread.iteration(); - workerThread.iteration(); - workerThread.iteration(); - workerThread.iteration(); + workerTask.initialize(TASK_PROPS); + workerTask.initializeAndStart(); + + // Initialize partition assignment + workerTask.iteration(); + // Fetch some data + workerTask.iteration(); + workerTask.iteration(); + // Trigger the commit + workerTask.iteration(); + // Trigger the timeout without another commit + workerTask.iteration(); + // TODO Response to consistent failures? - assertEquals(1, workerThread.commitFailures()); - assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); + assertEquals(1, workerTask.commitFailures()); + assertEquals(false, Whitebox.getInternalState(workerTask, "committing")); workerTask.stop(); workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); @@ -366,15 +400,14 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { PowerMock.expectLastCall(); expectStopTask(0); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - workerThread.iteration(); - workerThread.iteration(); - workerThread.iteration(); + workerTask.initialize(TASK_PROPS); + workerTask.initializeAndStart(); + workerTask.iteration(); + workerTask.iteration(); + workerTask.iteration(); workerTask.stop(); workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); @@ -385,6 +418,8 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { @Test public void testRewind() throws Exception { expectInitializeTask(); + expectPollInitialAssignment(); + final long startOffset = 40L; final Map<TopicPartition, Long> offsets = new HashMap<>(); @@ -410,14 +445,13 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { }); expectStopTask(3); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); - PowerMock.replayAll(); - workerTask.start(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - workerThread.iteration(); - workerThread.iteration(); + workerTask.initialize(TASK_PROPS); + workerTask.initializeAndStart(); + workerTask.iteration(); + workerTask.iteration(); + workerTask.iteration(); workerTask.stop(); workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); @@ -428,21 +462,25 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { private void expectInitializeTask() throws Exception { PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer); - workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start", "awaitShutdown"}, - workerTask, "mock-worker-thread", time, - workerConfig); - PowerMock.expectPrivate(workerTask, "createWorkerThread") - .andReturn(workerThread); - workerThread.start(); + consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener)); PowerMock.expectLastCall(); - consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener)); + sinkTask.initialize(EasyMock.capture(sinkTaskContext)); PowerMock.expectLastCall(); + sinkTask.start(TASK_PROPS); + PowerMock.expectLastCall(); + } + + private void expectPollInitialAssignment() throws Exception { + final List<TopicPartition> partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3); + + sinkTask.open(partitions); + EasyMock.expectLastCall(); EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() { @Override public ConsumerRecords<byte[], byte[]> answer() throws Throwable { - rebalanceListener.getValue().onPartitionsAssigned(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)); + rebalanceListener.getValue().onPartitionsAssigned(partitions); return ConsumerRecords.empty(); } }); @@ -450,15 +488,11 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET); - sinkTask.initialize(EasyMock.capture(sinkTaskContext)); - PowerMock.expectLastCall(); - sinkTask.start(TASK_PROPS); - PowerMock.expectLastCall(); + sinkTask.put(Collections.<SinkRecord>emptyList()); + EasyMock.expectLastCall(); } private void expectStopTask(final long expectedMessages) throws Exception { - final long finalOffset = FIRST_OFFSET + expectedMessages - 1; - sinkTask.stop(); PowerMock.expectLastCall(); @@ -526,10 +560,10 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { } private Capture<OffsetCommitCallback> expectOffsetFlush(final long expectedMessages, - final RuntimeException flushError, - final Exception consumerCommitError, - final long consumerCommitDelayMs, - final boolean invokeCallback) + final RuntimeException flushError, + final Exception consumerCommitError, + final long consumerCommitDelayMs, + final boolean invokeCallback) throws Exception { final long finalOffset = FIRST_OFFSET + expectedMessages; http://git-wip-us.apache.org/repos/asf/kafka/blob/1d80f563/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index f16cbeb..3888534 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -52,6 +52,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -77,6 +79,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { private static final byte[] SERIALIZED_KEY = "converted-key".getBytes(); private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes(); + private ExecutorService executor = Executors.newSingleThreadExecutor(); private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); private WorkerConfig config; @Mock private SourceTask sourceTask; @@ -132,7 +135,8 @@ public class WorkerSourceTaskTest extends ThreadedTest { PowerMock.replayAll(); - workerTask.start(EMPTY_TASK_PROPS); + workerTask.initialize(EMPTY_TASK_PROPS); + executor.submit(workerTask); awaitPolls(pollLatch); workerTask.stop(); assertEquals(true, workerTask.awaitStop(1000)); @@ -160,7 +164,8 @@ public class WorkerSourceTaskTest extends ThreadedTest { PowerMock.replayAll(); - workerTask.start(EMPTY_TASK_PROPS); + workerTask.initialize(EMPTY_TASK_PROPS); + executor.submit(workerTask); awaitPolls(pollLatch); assertTrue(workerTask.commitOffsets()); workerTask.stop(); @@ -189,7 +194,8 @@ public class WorkerSourceTaskTest extends ThreadedTest { PowerMock.replayAll(); - workerTask.start(EMPTY_TASK_PROPS); + workerTask.initialize(EMPTY_TASK_PROPS); + executor.submit(workerTask); awaitPolls(pollLatch); assertFalse(workerTask.commitOffsets()); workerTask.stop(); @@ -271,7 +277,8 @@ public class WorkerSourceTaskTest extends ThreadedTest { PowerMock.replayAll(); - workerTask.start(EMPTY_TASK_PROPS); + workerTask.initialize(EMPTY_TASK_PROPS); + executor.submit(workerTask); // Stopping immediately while the other thread has work to do should result in no polling, no offset commits, // exiting the work thread immediately, and the stop() method will be invoked in the background thread since it // cannot be invoked immediately in the thread trying to stop the task. http://git-wip-us.apache.org/repos/asf/kafka/blob/1d80f563/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 335e0ce..f33347a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -354,14 +354,13 @@ public class WorkerTest extends ThreadedTest { .andReturn(workerTask); Map<String, String> origProps = new HashMap<>(); origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); - workerTask.start(origProps); + workerTask.initialize(origProps); EasyMock.expectLastCall(); // Remove workerTask.stop(); EasyMock.expectLastCall(); EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true); - workerTask.close(); EasyMock.expectLastCall(); offsetBackingStore.stop(); @@ -424,7 +423,7 @@ public class WorkerTest extends ThreadedTest { .andReturn(workerTask); Map<String, String> origProps = new HashMap<>(); origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); - workerTask.start(origProps); + workerTask.initialize(origProps); EasyMock.expectLastCall(); // Remove on Worker.stop() @@ -432,7 +431,6 @@ public class WorkerTest extends ThreadedTest { EasyMock.expectLastCall(); EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andReturn(true); // Note that in this case we *do not* commit offsets since it's an unclean shutdown - workerTask.close(); EasyMock.expectLastCall(); offsetBackingStore.stop();
