Repository: kafka Updated Branches: refs/heads/trunk 177dd7f21 -> eaabb6cd0
KAFKA-4593; Don't throw IllegalStateException and die on task migration Author: Matthias J. Sax <[email protected]> Reviewers: Damian Guy <[email protected]>, Guozhang Wang <[email protected]> Closes #3948 from mjsax/kafka-4593-illegal-state-exception-in-restore Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eaabb6cd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eaabb6cd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eaabb6cd Branch: refs/heads/trunk Commit: eaabb6cd0173c4f6854eb5da39194a7e3fc0162c Parents: 177dd7f Author: Matthias J. Sax <[email protected]> Authored: Fri Sep 29 10:00:13 2017 +0100 Committer: Damian Guy <[email protected]> Committed: Fri Sep 29 10:00:13 2017 +0100 ---------------------------------------------------------------------- .../streams/errors/TaskMigratedException.java | 52 +++++++ .../processor/internals/AssignedTasks.java | 89 +++++++++--- .../processor/internals/ChangelogReader.java | 2 +- .../processor/internals/PunctuationQueue.java | 6 +- .../processor/internals/RestoringTasks.java | 23 +++ .../internals/StoreChangelogReader.java | 43 +++--- .../streams/processor/internals/StreamTask.java | 111 ++++++++++----- .../processor/internals/StreamThread.java | 36 ++++- .../processor/internals/TaskManager.java | 33 ++++- .../processor/internals/AssignedTasksTest.java | 140 ++++++++++++------- .../internals/MockChangelogReader.java | 53 +++++++ .../internals/ProcessorStateManagerTest.java | 1 - .../internals/StoreChangelogReaderTest.java | 87 +++++++++--- .../processor/internals/StreamThreadTest.java | 16 ++- .../processor/internals/TaskManagerTest.java | 4 +- .../apache/kafka/test/MockChangelogReader.java | 55 -------- 16 files changed, 530 insertions(+), 221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java new file mode 100644 index 0000000..f2fa594 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.internals.Task; + +/** + * Indicates that a task got migrated to another thread. + * Thus, the task raising this exception can be cleaned up and closed as "zombie". + */ +public class TaskMigratedException extends StreamsException { + + private final static long serialVersionUID = 1L; + + public TaskMigratedException(final Task task) { + this(task, null); + } + + public TaskMigratedException(final Task task, + final TopicPartition topicPartition, + final long endOffset, + final long pos) { + super(String.format("Log end offset of %s should not change while restoring: old end offset %d, current offset %d%n%s", + topicPartition, + endOffset, + pos, + task.toString("> ")), + null); + } + + public TaskMigratedException(final Task task, + final Throwable throwable) { + super(task.toString(), throwable); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index fcb717d..4448a78 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -16,13 +16,12 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.TaskId; import org.slf4j.Logger; @@ -38,7 +37,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; -class AssignedTasks { +class AssignedTasks implements RestoringTasks { private final Logger log; private final String taskTypeName; private final TaskAction maybeCommitAction; @@ -51,6 +50,7 @@ class AssignedTasks { // IQ may access this map. private Map<TaskId, Task> running = new ConcurrentHashMap<>(); private Map<TopicPartition, Task> runningByPartition = new HashMap<>(); + private Map<TopicPartition, Task> restoringByPartition = new HashMap<>(); private int committed = 0; @@ -121,7 +121,7 @@ class AssignedTasks { try { if (!entry.getValue().initialize()) { log.debug("transitioning {} {} to restoring", taskTypeName, entry.getKey()); - restoring.put(entry.getKey(), entry.getValue()); + addToRestoring(entry.getValue()); } else { transitionToRunning(entry.getValue()); } @@ -188,6 +188,7 @@ class AssignedTasks { restoring.clear(); created.clear(); runningByPartition.clear(); + restoringByPartition.clear(); return firstException.get(); } @@ -213,11 +214,8 @@ class AssignedTasks { try { task.suspend(); suspended.put(task.id(), task); - } catch (final CommitFailedException e) { - suspended.put(task.id(), task); - // commit failed during suspension. Just log it. - log.warn("Failed to commit {} {} state when suspending due to CommitFailedException", taskTypeName, task.id()); - } catch (final ProducerFencedException e) { + } catch (final TaskMigratedException closeAsZombieAndSwallow) { + // as we suspend a task, we are either shutting down or rebalancing, thus, we swallow and move on closeZombieTask(task); it.remove(); } catch (final RuntimeException e) { @@ -236,11 +234,11 @@ class AssignedTasks { } private void closeZombieTask(final Task task) { - log.warn("Producer of task {} fenced; closing zombie task", task.id()); + log.warn("{} {} got migrated to another thread already. Closing it as zombie.", taskTypeName, task.id()); try { task.close(false, true); } catch (final Exception e) { - log.warn("{} Failed to close zombie due to {}, ignore and proceed", taskTypeName, e); + log.warn("Failed to close zombie {} {} due to {}; ignore and proceed.", taskTypeName, task.id(), e.getMessage()); } } @@ -248,13 +246,22 @@ class AssignedTasks { return !running.isEmpty(); } + /** + * @throws TaskMigratedException if the task producer got fenced (EOS only) + */ boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition> partitions) { if (suspended.containsKey(taskId)) { final Task task = suspended.get(taskId); log.trace("found suspended {} {}", taskTypeName, taskId); if (task.partitions().equals(partitions)) { suspended.remove(taskId); - task.resume(); + try { + task.resume(); + } catch (final TaskMigratedException e) { + closeZombieTask(task); + suspended.remove(taskId); + throw e; + } transitionToRunning(task); log.trace("resuming suspended {} {}", taskTypeName, task.id()); return true; @@ -265,6 +272,16 @@ class AssignedTasks { return false; } + private void addToRestoring(final Task task) { + restoring.put(task.id(), task); + for (TopicPartition topicPartition : task.partitions()) { + restoringByPartition.put(topicPartition, task); + } + for (TopicPartition topicPartition : task.changelogPartitions()) { + restoringByPartition.put(topicPartition, task); + } + } + private void transitionToRunning(final Task task) { log.debug("transitioning {} {} to running", taskTypeName, task.id()); running.put(task.id(), task); @@ -276,6 +293,11 @@ class AssignedTasks { } } + @Override + public Task restoringTaskFor(final TopicPartition partition) { + return restoringByPartition.get(partition); + } + Task runningTaskFor(final TopicPartition partition) { return runningByPartition.get(partition); } @@ -332,6 +354,7 @@ class AssignedTasks { void clear() { runningByPartition.clear(); + restoringByPartition.clear(); running.clear(); created.clear(); suspended.clear(); @@ -342,25 +365,42 @@ class AssignedTasks { return previousActiveTasks; } + /** + * @throws TaskMigratedException if committing offsets failed (non-EOS) + * or if the task producer got fenced (EOS) + */ int commit() { applyToRunningTasks(commitAction); return running.size(); } + /** + * @throws TaskMigratedException if committing offsets failed (non-EOS) + * or if the task producer got fenced (EOS) + */ int maybeCommit() { committed = 0; applyToRunningTasks(maybeCommitAction); return committed; } + /** + * @throws TaskMigratedException if the task producer got fenced (EOS only) + */ int process() { int processed = 0; - for (final Task task : running.values()) { + final Iterator<Map.Entry<TaskId, Task>> it = running.entrySet().iterator(); + while (it.hasNext()) { + final Task task = it.next().getValue(); try { if (task.process()) { processed++; } - } catch (RuntimeException e) { + } catch (final TaskMigratedException e) { + closeZombieTask(task); + it.remove(); + throw e; + } catch (final RuntimeException e) { log.error("Failed to process {} {} due to the following error:", taskTypeName, task.id(), e); throw e; } @@ -368,9 +408,14 @@ class AssignedTasks { return processed; } + /** + * @throws TaskMigratedException if the task producer got fenced (EOS only) + */ int punctuate() { int punctuated = 0; - for (Task task : running.values()) { + final Iterator<Map.Entry<TaskId, Task>> it = running.entrySet().iterator(); + while (it.hasNext()) { + final Task task = it.next().getValue(); try { if (task.maybePunctuateStreamTime()) { punctuated++; @@ -378,7 +423,11 @@ class AssignedTasks { if (task.maybePunctuateSystemTime()) { punctuated++; } - } catch (KafkaException e) { + } catch (final TaskMigratedException e) { + closeZombieTask(task); + it.remove(); + throw e; + } catch (final KafkaException e) { log.error("Failed to punctuate {} {} due to the following error:", taskTypeName, task.id(), e); throw e; } @@ -393,12 +442,12 @@ class AssignedTasks { final Task task = it.next(); try { action.apply(task); - } catch (final CommitFailedException e) { - // commit failed. This is already logged inside the task as WARN and we can just log it again here. - log.warn("Failed to commit {} {} during {} state due to CommitFailedException; this task may be no longer owned by the thread", taskTypeName, task.id(), action.name()); - } catch (final ProducerFencedException e) { + } catch (final TaskMigratedException e) { closeZombieTask(task); it.remove(); + if (firstException == null) { + firstException = e; + } } catch (final RuntimeException t) { log.error("Failed to {} {} {} due to the following error:", action.name(), http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java index 5ebc34c..ed07aa7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java @@ -37,7 +37,7 @@ public interface ChangelogReader { * Restore all registered state stores by reading from their changelogs. * @return all topic partitions that have been restored */ - Collection<TopicPartition> restore(); + Collection<TopicPartition> restore(final RestoringTasks active); /** * @return the restored offsets for all persistent stores. http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java index ec047e6..80eda6c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.PunctuationType; @@ -38,7 +39,10 @@ public class PunctuationQueue { } } - public boolean mayPunctuate(final long timestamp, final PunctuationType type, final ProcessorNodePunctuator processorNodePunctuator) { + /** + * @throws TaskMigratedException if the task producer got fenced (EOS only) + */ + boolean mayPunctuate(final long timestamp, final PunctuationType type, final ProcessorNodePunctuator processorNodePunctuator) { synchronized (pq) { boolean punctuated = false; PunctuationSchedule top = pq.peek(); http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java new file mode 100644 index 0000000..6ed28fd --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.TopicPartition; + +public interface RestoringTasks { + Task restoringTaskFor(final TopicPartition partition); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 4ba860d..cc298e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.StateRestoreListener; import org.slf4j.Logger; @@ -48,8 +49,7 @@ public class StoreChangelogReader implements ChangelogReader { private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>(); private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>(); - public StoreChangelogReader(final String threadId, - final Consumer<byte[], byte[]> consumer, + public StoreChangelogReader(final Consumer<byte[], byte[]> consumer, final StateRestoreListener userStateRestoreListener, final LogContext logContext) { this.consumer = consumer; @@ -57,12 +57,6 @@ public class StoreChangelogReader implements ChangelogReader { this.userStateRestoreListener = userStateRestoreListener; } - public StoreChangelogReader(final Consumer<byte[], byte[]> consumer, - final StateRestoreListener stateRestoreListener, - final LogContext logContext) { - this("", consumer, stateRestoreListener, logContext); - } - @Override public void register(final StateRestorer restorer) { restorer.setUserRestoreListener(userStateRestoreListener); @@ -70,7 +64,10 @@ public class StoreChangelogReader implements ChangelogReader { needsInitializing.put(restorer.partition(), restorer); } - public Collection<TopicPartition> restore() { + /** + * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored + */ + public Collection<TopicPartition> restore(final RestoringTasks active) { if (!needsInitializing.isEmpty()) { initialize(); } @@ -83,7 +80,7 @@ public class StoreChangelogReader implements ChangelogReader { final Set<TopicPartition> partitions = new HashSet<>(needsRestoring.keySet()); final ConsumerRecords<byte[], byte[]> allRecords = consumer.poll(10); for (final TopicPartition partition : partitions) { - restorePartition(allRecords, partition); + restorePartition(allRecords, partition, active.restoringTaskFor(partition)); } if (needsRestoring.isEmpty()) { @@ -230,19 +227,19 @@ public class StoreChangelogReader implements ChangelogReader { needsInitializing.clear(); } + /** + * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored + */ private void restorePartition(final ConsumerRecords<byte[], byte[]> allRecords, - final TopicPartition topicPartition) { + final TopicPartition topicPartition, + final Task task) { final StateRestorer restorer = stateRestorers.get(topicPartition); final Long endOffset = endOffsets.get(topicPartition); final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset); restorer.setRestoredOffset(pos); if (restorer.hasCompleted(pos, endOffset)) { if (pos > endOffset + 1) { - throw new IllegalStateException( - String.format("Log end offset of %s should not change while restoring: old end offset %d, current offset %d", - topicPartition, - endOffset, - pos)); + throw new TaskMigratedException(task, topicPartition, endOffset, pos); } log.debug("Completed restoring state from changelog {} with {} records ranging from offset {} to {}", @@ -260,12 +257,11 @@ public class StoreChangelogReader implements ChangelogReader { final StateRestorer restorer, final Long endOffset) { final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>(); - long nextPosition = -1; + long offset = -1; for (final ConsumerRecord<byte[], byte[]> record : records) { - final long offset = record.offset(); + offset = record.offset(); if (restorer.hasCompleted(offset, endOffset)) { - nextPosition = record.offset(); break; } if (record.key() != null) { @@ -273,17 +269,16 @@ public class StoreChangelogReader implements ChangelogReader { } } - if (nextPosition == -1) { - nextPosition = consumer.position(restorer.partition()); + if (offset == -1) { + offset = consumer.position(restorer.partition()); } if (!restoreRecords.isEmpty()) { restorer.restore(restoreRecords); - restorer.restoreBatchCompleted(nextPosition, records.size()); - + restorer.restoreBatchCompleted(offset + 1, records.size()); } - return nextPosition; + return consumer.position(restorer.partition()); } private boolean hasPartition(final TopicPartition topicPartition) { http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 8c26fa9..3d6c9b9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -31,6 +31,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; @@ -99,6 +100,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator * @param metrics the {@link StreamsMetrics} created by the thread * @param stateDirectory the {@link StateDirectory} created by the thread * @param producer the instance of {@link Producer} used to produce records + * @throws TaskMigratedException if the task producer got fenced (EOS only) */ public StreamTask(final TaskId id, final String applicationId, @@ -145,8 +147,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator stateMgr.registerGlobalStateStores(topology.globalStateStores()); if (eosEnabled) { - this.producer.initTransactions(); - this.producer.beginTransaction(); + try { + this.producer.initTransactions(); + this.producer.beginTransaction(); + } catch (final ProducerFencedException fatal) { + throw new TaskMigratedException(this, fatal); + } transactionInFlight = true; } } @@ -167,12 +173,17 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator * - re-initialize the task * - if (eos) begin new transaction * </pre> + * @throws TaskMigratedException if the task producer got fenced (EOS only) */ @Override public void resume() { log.debug("Resuming"); if (eosEnabled) { - producer.beginTransaction(); + try { + producer.beginTransaction(); + } catch (final ProducerFencedException fatal) { + throw new TaskMigratedException(this, fatal); + } transactionInFlight = true; } initTopology(); @@ -182,6 +193,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator * Process one record. * * @return true if this method processes a record, false if it does not process a record. + * @throws TaskMigratedException if the task producer got fenced (EOS only) */ @SuppressWarnings("unchecked") public boolean process() { @@ -214,6 +226,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator if (recordInfo.queue().size() == maxBufferedSize) { consumer.resume(singleton(partition)); } + } catch (final ProducerFencedException fatal) { + throw new TaskMigratedException(this, fatal); } catch (final KafkaException e) { throw new StreamsException(format("Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d", id(), @@ -231,6 +245,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator /** * @throws IllegalStateException if the current node is not null + * @throws TaskMigratedException if the task producer got fenced (EOS only) */ @Override public void punctuate(final ProcessorNode node, final long timestamp, final PunctuationType type, final Punctuator punctuator) { @@ -246,6 +261,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator try { node.punctuate(timestamp, punctuator); + } catch (final ProducerFencedException fatal) { + throw new TaskMigratedException(this, fatal); } catch (final KafkaException e) { throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix, node.name()), e); } finally { @@ -264,12 +281,18 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator * - if(!eos) write checkpoint * - commit offsets and start new transaction * </pre> + * @throws TaskMigratedException if committing offsets failed (non-EOS) + * or if the task producer got fenced (EOS) */ @Override public void commit() { commit(true); } + /** + * @throws TaskMigratedException if committing offsets failed (non-EOS) + * or if the task producer got fenced (EOS) + */ // visible for testing void commit(final boolean startNewTransaction) { log.debug("Committing"); @@ -299,44 +322,51 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator protected void flushState() { log.trace("Flushing state and producer"); super.flushState(); - recordCollector.flush(); + try { + recordCollector.flush(); + } catch (final ProducerFencedException fatal) { + throw new TaskMigratedException(this, fatal); + } } + /** + * @throws TaskMigratedException if committing offsets failed (non-EOS) + * or if the task producer got fenced (EOS) + */ private void commitOffsets(final boolean startNewTransaction) { - if (commitOffsetNeeded) { - log.trace("Committing offsets"); - final Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size()); - for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) { - final TopicPartition partition = entry.getKey(); - final long offset = entry.getValue() + 1; - consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset)); - stateMgr.putOffsetLimit(partition, offset); - } - - if (eosEnabled) { - producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId); - producer.commitTransaction(); - transactionInFlight = false; - if (startNewTransaction) { - transactionInFlight = true; - producer.beginTransaction(); + try { + if (commitOffsetNeeded) { + log.trace("Committing offsets"); + final Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size()); + for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) { + final TopicPartition partition = entry.getKey(); + final long offset = entry.getValue() + 1; + consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset)); + stateMgr.putOffsetLimit(partition, offset); } - } else { - try { + + if (eosEnabled) { + producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId); + producer.commitTransaction(); + transactionInFlight = false; + if (startNewTransaction) { + transactionInFlight = true; + producer.beginTransaction(); + } + } else { consumer.commitSync(consumedOffsetsAndMetadata); - } catch (final CommitFailedException e) { - log.warn("Failed offset commits {} due to CommitFailedException", consumedOffsetsAndMetadata); - throw e; } + commitOffsetNeeded = false; + } else if (eosEnabled && !startNewTransaction && transactionInFlight) { // need to make sure to commit txn for suspend case + producer.commitTransaction(); + transactionInFlight = false; } - commitOffsetNeeded = false; - } else if (eosEnabled && !startNewTransaction && transactionInFlight) { // need to make sure to commit txn for suspend case - producer.commitTransaction(); - transactionInFlight = false; + } catch (final CommitFailedException | ProducerFencedException fatal) { + throw new TaskMigratedException(this, fatal); } } - void initTopology() { + private void initTopology() { // initialize the task by initializing all its processor nodes in the topology log.trace("Initializing processor nodes of the topology"); for (final ProcessorNode node : topology.processors()) { @@ -357,6 +387,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator * - if (!eos) write checkpoint * - commit offsets * </pre> + * @throws TaskMigratedException if committing offsets failed (non-EOS) + * or if the task producer got fenced (EOS) */ @Override public void suspend() { @@ -372,6 +404,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator * - if (!eos) write checkpoint * - commit offsets * </pre> + * @throws TaskMigratedException if committing offsets failed (non-EOS) + * or if the task producer got fenced (EOS) */ // visible for testing void suspend(final boolean clean) { @@ -433,7 +467,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator producer.abortTransaction(); } transactionInFlight = false; - } catch (final ProducerFencedException e) { + } catch (final ProducerFencedException ignore) { + /* TODO + * this should actually never happen atm as we we guard the call to #abortTransaction + * -> the reason for the guard is a "bug" in the Producer -- it throws IllegalStateException + * instead of ProducerFencedException atm. We can remove the isZombie flag after KAFKA-5604 got + * fixed and fall-back to this catch-and-swallow code + */ + // can be ignored: transaction got already aborted by brokers/transactional-coordinator if this happens } } @@ -470,7 +511,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator * </pre> * @param clean shut down cleanly (ie, incl. flush and commit) if {@code true} -- * otherwise, just close open resources - * @param isZombie {@code true} is this task is a zombie or not + * @param isZombie {@code true} is this task is a zombie or not (this will repress {@link TaskMigratedException} + * @throws TaskMigratedException if committing offsets failed (non-EOS) + * or if the task producer got fenced (EOS) */ @Override public void close(boolean clean, @@ -550,6 +593,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator * Possibly trigger registered stream-time punctuation functions if * current partition group timestamp has reached the defined stamp * Note, this is only called in the presence of new records + * @throws TaskMigratedException if the task producer got fenced (EOS only) */ public boolean maybePunctuateStreamTime() { final long timestamp = partitionGroup.timestamp(); @@ -567,6 +611,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator * Possibly trigger registered system-time punctuation functions if * current system timestamp has reached the defined stamp * Note, this is called irrespective of the presence of new records + * @throws TaskMigratedException if the task producer got fenced (EOS only) */ public boolean maybePunctuateSystemTime() { final long timestamp = time.milliseconds(); http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 8d13558..ea7d362 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -27,7 +27,6 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; @@ -43,6 +42,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskIdFormatException; +import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.PartitionGrouper; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.TaskId; @@ -291,7 +291,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { taskManager.suspendTasksAndState(); } catch (final Throwable t) { log.error("Error caught during partition revocation, " + - "will abort the current process and re-throw at the end of rebalance: {}", t.getMessage()); + "will abort the current process and re-throw at the end of rebalance: {}", t.getMessage()); streamThread.setRebalanceException(t); } finally { streamThread.refreshMetadataState(); @@ -339,6 +339,9 @@ public class StreamThread extends Thread implements ThreadDataProvider { this.log = log; } + /** + * @throws TaskMigratedException if the task producer got fenced (EOS only) + */ Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer, final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) { final List<Task> createdTasks = new ArrayList<>(); for (final Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions : tasksToBeCreated.entrySet()) { @@ -391,6 +394,9 @@ public class StreamThread extends Thread implements ThreadDataProvider { this.threadClientId = threadClientId; } + /** + * @throws TaskMigratedException if the task producer got fenced (EOS only) + */ @Override StreamTask createTask(final Consumer<byte[], byte[]> consumer, final TaskId taskId, final Set<TopicPartition> partitions) { taskCreatedSensor.record(); @@ -664,8 +670,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { log.info("Creating restore consumer client"); final Map<String, Object> consumerConfigs = config.getRestoreConsumerConfigs(threadClientId); final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(consumerConfigs); - final StoreChangelogReader changelogReader = new StoreChangelogReader(threadClientId, - restoreConsumer, + final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer, userStateRestoreListener, logContext); @@ -765,13 +770,22 @@ public class StreamThread extends Thread implements ThreadDataProvider { consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); while (isRunning()) { - recordsProcessedBeforeCommit = runOnce(recordsProcessedBeforeCommit); + try { + recordsProcessedBeforeCommit = runOnce(recordsProcessedBeforeCommit); + } catch (final TaskMigratedException ignoreAndRejoinGroup) { + log.warn("Detected a task that got migrated to another thread. " + + "This implies that this thread missed a rebalance and dropped out of the consumer group. " + + "Trying to rejoin the consumer group now.", ignoreAndRejoinGroup); + } } } /** * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition + * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored + * or if committing offsets failed (non-EOS) + * or if the task producer got fenced (EOS) */ // Visible for testing long runOnce(final long recordsProcessedBeforeCommit) { @@ -811,6 +825,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { /** * Get the next batch of records by polling. * @return Next batch of records or null if no records available. + * @throws TaskMigratedException if the task producer got fenced (EOS only) */ private ConsumerRecords<byte[], byte[]> pollRequests() { ConsumerRecords<byte[], byte[]> records = null; @@ -822,7 +837,9 @@ public class StreamThread extends Thread implements ThreadDataProvider { } if (rebalanceException != null) { - if (!(rebalanceException instanceof ProducerFencedException)) { + if (rebalanceException instanceof TaskMigratedException) { + throw (TaskMigratedException) rebalanceException; + } else { throw new StreamsException(logPrefix + "Failed to rebalance.", rebalanceException); } } @@ -895,6 +912,8 @@ public class StreamThread extends Thread implements ThreadDataProvider { * @param recordsProcessedBeforeCommit number of records to be processed before commit is called. * if UNLIMITED_RECORDS, then commit is never called * @return Number of records processed since last commit. + * @throws TaskMigratedException if committing offsets failed (non-EOS) + * or if the task producer got fenced (EOS) */ private long processAndMaybeCommit(final long recordsProcessedBeforeCommit) { @@ -926,6 +945,9 @@ public class StreamThread extends Thread implements ThreadDataProvider { return totalProcessedSinceLastMaybeCommit; } + /** + * @throws TaskMigratedException if the task producer got fenced (EOS only) + */ private void punctuate() { final int punctuated = taskManager.punctuate(); if (punctuated > 0) { @@ -966,6 +988,8 @@ public class StreamThread extends Thread implements ThreadDataProvider { /** * Commit all tasks owned by this thread if specified interval time has elapsed + * @throws TaskMigratedException if committing offsets failed (non-EOS) + * or if the task producer got fenced (EOS) */ void maybeCommit(final long now) { if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) { http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 278957e..652f4e4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.TaskId; import org.slf4j.Logger; @@ -67,6 +68,9 @@ class TaskManager { this.log = logContext.logger(getClass()); } + /** + * @throws TaskMigratedException if the task producer got fenced (EOS only) + */ void createTasks(final Collection<TopicPartition> assignment) { if (threadMetadataProvider == null) { throw new IllegalStateException(logPrefix + "taskIdProvider has not been initialized while adding stream tasks. This should not happen."); @@ -92,6 +96,9 @@ class TaskManager { this.threadMetadataProvider = threadMetadataProvider; } + /** + * @throws TaskMigratedException if the task producer got fenced (EOS only) + */ private void addStreamTasks(final Collection<TopicPartition> assignment) { Map<TaskId, Set<TopicPartition>> assignedTasks = threadMetadataProvider.activeTasks(); if (assignedTasks.isEmpty()) { @@ -132,6 +139,9 @@ class TaskManager { } } + /** + * @throws TaskMigratedException if the task producer got fenced (EOS only) + */ private void addStandbyTasks() { final Map<TaskId, Set<TopicPartition>> assignedStandbyTasks = threadMetadataProvider.standbyTasks(); if (assignedStandbyTasks.isEmpty()) { @@ -177,6 +187,7 @@ class TaskManager { /** * Similar to shutdownTasksAndState, however does not close the task managers, in the hope that * soon the tasks will be assigned again + * @throws TaskMigratedException if the task producer got fenced (EOS only) */ void suspendTasksAndState() { log.debug("Suspending all active tasks {} and standby tasks {}", active.runningTaskIds(), standby.runningTaskIds()); @@ -188,8 +199,9 @@ class TaskManager { // remove the changelog partitions from restore consumer restoreConsumer.assign(Collections.<TopicPartition>emptyList()); - if (firstException.get() != null) { - throw new StreamsException(logPrefix + "failed to suspend stream tasks", firstException.get()); + final Exception exception = firstException.get(); + if (exception != null) { + throw new StreamsException(logPrefix + "failed to suspend stream tasks", exception); } } @@ -242,12 +254,13 @@ class TaskManager { /** * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition + * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored */ boolean updateNewAndRestoringTasks() { active.initializeNewTasks(); standby.initializeNewTasks(); - final Collection<TopicPartition> restored = changelogReader.restore(); + final Collection<TopicPartition> restored = changelogReader.restore(active); final Set<TopicPartition> resumed = active.updateRestored(restored); if (!resumed.isEmpty()) { @@ -288,19 +301,33 @@ class TaskManager { } } + /** + * @throws TaskMigratedException if committing offsets failed (non-EOS) + * or if the task producer got fenced (EOS) + */ int commitAll() { int committed = active.commit(); return committed + standby.commit(); } + /** + * @throws TaskMigratedException if the task producer got fenced (EOS only) + */ int process() { return active.process(); } + /** + * @throws TaskMigratedException if the task producer got fenced (EOS only) + */ int punctuate() { return active.punctuate(); } + /** + * @throws TaskMigratedException if committing offsets failed (non-EOS) + * or if the task producer got fenced (EOS) + */ int maybeCommitActiveTasks() { return active.maybeCommit(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java index 7d6bb3a..9d6aea1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java @@ -17,11 +17,10 @@ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.TaskId; import org.easymock.EasyMock; import org.junit.Before; @@ -91,6 +90,8 @@ public class AssignedTasksTest { @Test public void shouldInitializeNewTasks() { EasyMock.expect(t1.initialize()).andReturn(false); + EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)); + EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet()); EasyMock.replay(t1); addAndInitTask(); @@ -101,6 +102,8 @@ public class AssignedTasksTest { @Test public void shouldMoveInitializedTasksNeedingRestoreToRestoring() { EasyMock.expect(t1.initialize()).andReturn(false); + EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)); + EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet()); EasyMock.expect(t2.initialize()).andReturn(true); EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2)); EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()); @@ -152,7 +155,7 @@ public class AssignedTasksTest { mockRunningTaskSuspension(); EasyMock.replay(t1); - suspendTask(); + assertThat(suspendTask(), nullValue()); assertThat(assignedTasks.previousTaskIds(), equalTo(Collections.singleton(taskId1))); EasyMock.verify(t1); @@ -161,11 +164,13 @@ public class AssignedTasksTest { @Test public void shouldCloseRestoringTasks() { EasyMock.expect(t1.initialize()).andReturn(false); + EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)); + EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet()); t1.close(false, false); EasyMock.expectLastCall(); EasyMock.replay(t1); - suspendTask(); + assertThat(suspendTask(), nullValue()); EasyMock.verify(t1); } @@ -176,7 +181,7 @@ public class AssignedTasksTest { EasyMock.replay(t1); assignedTasks.addNewTask(t1); - assignedTasks.suspend(); + assertThat(assignedTasks.suspend(), nullValue()); EasyMock.verify(t1); } @@ -186,8 +191,8 @@ public class AssignedTasksTest { mockRunningTaskSuspension(); EasyMock.replay(t1); - suspendTask(); - assignedTasks.suspend(); + assertThat(suspendTask(), nullValue()); + assertThat(assignedTasks.suspend(), nullValue()); EasyMock.verify(t1); } @@ -207,15 +212,14 @@ public class AssignedTasksTest { } @Test - public void shouldCloseTaskOnSuspendWhenProducerFencedException() { + public void shouldCloseTaskOnSuspendIfTaskMigratedException() { mockTaskInitialization(); t1.suspend(); - EasyMock.expectLastCall().andThrow(new ProducerFencedException("KABOOM!")); + EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1)); t1.close(false, true); EasyMock.expectLastCall(); EasyMock.replay(t1); - assertThat(suspendTask(), nullValue()); assertTrue(assignedTasks.previousTaskIds().isEmpty()); EasyMock.verify(t1); @@ -228,13 +232,32 @@ public class AssignedTasksTest { EasyMock.expectLastCall(); EasyMock.replay(t1); - suspendTask(); + assertThat(suspendTask(), nullValue()); assertTrue(assignedTasks.maybeResumeSuspendedTask(taskId1, Collections.singleton(tp1))); assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1))); EasyMock.verify(t1); } + @Test + public void shouldCloseTaskOnResumeIfTaskMigratedException() { + mockRunningTaskSuspension(); + t1.resume(); + EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1)); + t1.close(false, true); + EasyMock.expectLastCall(); + EasyMock.replay(t1); + + assertThat(suspendTask(), nullValue()); + + try { + assignedTasks.maybeResumeSuspendedTask(taskId1, Collections.singleton(tp1)); + fail("Should have thrown TaskMigratedException."); + } catch (final TaskMigratedException expected) { /* ignore */ } + + assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET)); + EasyMock.verify(t1); + } private void mockTaskInitialization() { EasyMock.expect(t1.initialize()).andReturn(true); @@ -256,29 +279,21 @@ public class AssignedTasksTest { } @Test - public void shouldCloseTaskOnCommitIfProduceFencedException() { + public void shouldCloseTaskOnCommitIfTaskMigratedException() { mockTaskInitialization(); t1.commit(); - EasyMock.expectLastCall().andThrow(new ProducerFencedException("")); + EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1)); t1.close(false, true); EasyMock.expectLastCall(); EasyMock.replay(t1); addAndInitTask(); - assignedTasks.commit(); - EasyMock.verify(t1); - } - - @Test - public void shouldNotThrowCommitFailedExceptionOnCommit() { - mockTaskInitialization(); - t1.commit(); - EasyMock.expectLastCall().andThrow(new CommitFailedException()); - EasyMock.replay(t1); - addAndInitTask(); + try { + assignedTasks.commit(); + fail("Should have thrown TaskMigratedException."); + } catch (final TaskMigratedException expected) { /* ignore */ } - assignedTasks.commit(); - assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1))); + assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET)); EasyMock.verify(t1); } @@ -315,66 +330,91 @@ public class AssignedTasksTest { } @Test - public void shouldCloseTaskOnMaybeCommitIfProduceFencedException() { + public void shouldCloseTaskOnMaybeCommitIfTaskMigratedException() { mockTaskInitialization(); EasyMock.expect(t1.commitNeeded()).andReturn(true); t1.commit(); - EasyMock.expectLastCall().andThrow(new ProducerFencedException("")); + EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1)); t1.close(false, true); EasyMock.expectLastCall(); EasyMock.replay(t1); addAndInitTask(); - assignedTasks.maybeCommit(); + try { + assignedTasks.maybeCommit(); + fail("Should have thrown TaskMigratedException."); + } catch (final TaskMigratedException expected) { /* ignore */ } + + assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET)); EasyMock.verify(t1); } @Test - public void shouldNotThrowCommitFailedExceptionOnMaybeCommit() { + public void shouldCloseTaskOnProcessesIfTaskMigratedException() { mockTaskInitialization(); - EasyMock.expect(t1.commitNeeded()).andReturn(true); - t1.commit(); - EasyMock.expectLastCall().andThrow(new CommitFailedException()); + t1.process(); + EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1)); + t1.close(false, true); + EasyMock.expectLastCall(); EasyMock.replay(t1); addAndInitTask(); - assignedTasks.maybeCommit(); - assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1))); + try { + assignedTasks.process(); + fail("Should have thrown TaskMigratedException."); + } catch (final TaskMigratedException expected) { /* ignore */ } + + assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET)); EasyMock.verify(t1); } @Test - public void shouldThrowExceptionOnMaybeCommitWhenNotCommitFailedOrProducerFenced() { + public void shouldPunctuateRunningTasks() { mockTaskInitialization(); - EasyMock.expect(t1.commitNeeded()).andReturn(true); - t1.commit(); - EasyMock.expectLastCall().andThrow(new RuntimeException("")); + EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true); + EasyMock.expect(t1.maybePunctuateSystemTime()).andReturn(true); EasyMock.replay(t1); addAndInitTask(); - try { - assignedTasks.maybeCommit(); - fail("Should have thrown exception"); - } catch (Exception e) { - // ok - } - assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1))); + assertThat(assignedTasks.punctuate(), equalTo(2)); EasyMock.verify(t1); } + @Test + public void shouldCloseTaskOnMaybePunctuateStreamTimeIfTaskMigratedException() { + mockTaskInitialization(); + t1.maybePunctuateStreamTime(); + EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1)); + t1.close(false, true); + EasyMock.expectLastCall(); + EasyMock.replay(t1); + addAndInitTask(); + + try { + assignedTasks.punctuate(); + fail("Should have thrown TaskMigratedException."); + } catch (final TaskMigratedException expected) { /* ignore */ } + assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET)); + EasyMock.verify(t1); + } @Test - public void shouldPunctuateRunningTasks() { + public void shouldCloseTaskOnMaybePunctuateSystemTimeIfTaskMigratedException() { mockTaskInitialization(); EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true); - EasyMock.expect(t1.maybePunctuateSystemTime()).andReturn(true); + t1.maybePunctuateSystemTime(); + EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1)); + t1.close(false, true); + EasyMock.expectLastCall(); EasyMock.replay(t1); - addAndInitTask(); - assertThat(assignedTasks.punctuate(), equalTo(2)); + try { + assignedTasks.punctuate(); + fail("Should have thrown TaskMigratedException."); + } catch (final TaskMigratedException expected) { /* ignore */ } EasyMock.verify(t1); } http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java new file mode 100644 index 0000000..6c3be61 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class MockChangelogReader implements ChangelogReader { + private final Set<TopicPartition> registered = new HashSet<>(); + + @Override + public void register(final StateRestorer restorer) { + registered.add(restorer.partition()); + } + + @Override + public Collection<TopicPartition> restore(final RestoringTasks active) { + return registered; + } + + @Override + public Map<TopicPartition, Long> restoredOffsets() { + return Collections.emptyMap(); + } + + @Override + public void reset() { + registered.clear(); + } + + public boolean wasRegistered(final TopicPartition partition) { + return registered.contains(partition); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index ede6dd4..dc009f5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.test.MockBatchingStateRestoreListener; -import org.apache.kafka.test.MockChangelogReader; import org.apache.kafka.test.MockStateStoreSupplier; import org.apache.kafka.test.TestUtils; import org.junit.After; http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 5da0a64..3c54851 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -25,12 +25,17 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.test.MockRestoreCallback; import org.apache.kafka.test.MockStateRestoreListener; +import org.easymock.EasyMockRunner; +import org.easymock.Mock; +import org.easymock.MockType; import org.hamcrest.CoreMatchers; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import java.util.Collection; import java.util.Collections; @@ -41,13 +46,21 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH; import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END; import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +@RunWith(EasyMockRunner.class) public class StoreChangelogReaderTest { + @Mock(type = MockType.NICE) + private RestoringTasks active; + @Mock(type = MockType.NICE) + private Task task; + private final MockStateRestoreListener callback = new MockStateRestoreListener(); private final CompositeRestoreListener restoreListener = new CompositeRestoreListener(callback); private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); @@ -75,7 +88,7 @@ public class StoreChangelogReaderTest { final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); - changelogReader.restore(); + changelogReader.restore(active); assertTrue(functionCalled.get()); } @@ -83,7 +96,7 @@ public class StoreChangelogReaderTest { public void shouldThrowExceptionIfConsumerHasCurrentSubscription() { consumer.subscribe(Collections.singleton("sometopic")); try { - changelogReader.restore(); + changelogReader.restore(active); fail("Should have thrown IllegalStateException"); } catch (final IllegalStateException e) { // ok @@ -96,7 +109,7 @@ public class StoreChangelogReaderTest { setupConsumer(messages, topicPartition); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); - changelogReader.restore(); + changelogReader.restore(active); assertThat(callback.restored.size(), equalTo(messages)); } @@ -107,7 +120,7 @@ public class StoreChangelogReaderTest { changelogReader.register(new StateRestorer(topicPartition, restoreListener, 5L, Long.MAX_VALUE, true, "storeName")); - changelogReader.restore(); + changelogReader.restore(active); assertThat(callback.restored.size(), equalTo(5)); } @@ -118,7 +131,7 @@ public class StoreChangelogReaderTest { changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); - changelogReader.restore(); + changelogReader.restore(active); assertThat(consumer.assignment(), equalTo(Collections.<TopicPartition>emptySet())); } @@ -128,7 +141,7 @@ public class StoreChangelogReaderTest { final StateRestorer restorer = new StateRestorer(topicPartition, restoreListener, null, 3, true, "storeName"); changelogReader.register(restorer); - changelogReader.restore(); + changelogReader.restore(active); assertThat(callback.restored.size(), equalTo(3)); assertThat(restorer.restoredOffset(), equalTo(3L)); } @@ -150,7 +163,10 @@ public class StoreChangelogReaderTest { changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE, true, "storeName2")); changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE, true, "storeName3")); - changelogReader.restore(); + expect(active.restoringTaskFor(one)).andReturn(null); + expect(active.restoringTaskFor(two)).andReturn(null); + replay(active); + changelogReader.restore(active); assertThat(callback.restored.size(), equalTo(10)); assertThat(callbackOne.restored.size(), equalTo(5)); @@ -174,7 +190,10 @@ public class StoreChangelogReaderTest { changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE, true, "storeName2")); changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE, true, "storeName3")); - changelogReader.restore(); + expect(active.restoringTaskFor(one)).andReturn(null); + expect(active.restoringTaskFor(two)).andReturn(null); + replay(active); + changelogReader.restore(active); assertThat(callback.restored.size(), equalTo(10)); assertThat(callbackOne.restored.size(), equalTo(5)); @@ -199,7 +218,7 @@ public class StoreChangelogReaderTest { private void assertCorrectOffsetsReportedByListener(final MockStateRestoreListener restoreListener, - long startOffset, + final long startOffset, final long batchOffset, final long endOffset) { assertThat(restoreListener.restoreStartOffset, equalTo(startOffset)); @@ -215,7 +234,7 @@ public class StoreChangelogReaderTest { setupConsumer(0, topicPartition); changelogReader.register(restorer); - changelogReader.restore(); + changelogReader.restore(active); assertThat(callback.restored.size(), equalTo(0)); assertThat(restorer.restoredOffset(), equalTo(0L)); } @@ -230,7 +249,7 @@ public class StoreChangelogReaderTest { changelogReader.register(restorer); - changelogReader.restore(); + changelogReader.restore(active); assertThat(callback.restored.size(), equalTo(0)); assertThat(restorer.restoredOffset(), equalTo(endOffset)); } @@ -240,7 +259,7 @@ public class StoreChangelogReaderTest { setupConsumer(10, topicPartition); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); - changelogReader.restore(); + changelogReader.restore(active); final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets(); assertThat(restoredOffsets, equalTo(Collections.singletonMap(topicPartition, 10L))); } @@ -250,7 +269,7 @@ public class StoreChangelogReaderTest { setupConsumer(10, topicPartition); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, "storeName")); - changelogReader.restore(); + changelogReader.restore(active); final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets(); assertThat(restoredOffsets, equalTo(Collections.<TopicPartition, Long>emptyMap())); } @@ -265,7 +284,7 @@ public class StoreChangelogReaderTest { consumer.assign(Collections.singletonList(topicPartition)); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, "storeName")); - changelogReader.restore(); + changelogReader.restore(active); assertThat(callback.restored, CoreMatchers.equalTo(Utils.mkList(KeyValue.pair(bytes, bytes), KeyValue.pair(bytes, bytes)))); } @@ -275,7 +294,7 @@ public class StoreChangelogReaderTest { final Collection<TopicPartition> expected = Collections.singleton(topicPartition); setupConsumer(0, topicPartition); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "store")); - final Collection<TopicPartition> restored = changelogReader.restore(); + final Collection<TopicPartition> restored = changelogReader.restore(active); assertThat(restored, equalTo(expected)); } @@ -288,11 +307,16 @@ public class StoreChangelogReaderTest { consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 10L)); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, "storeName")); - assertTrue(changelogReader.restore().isEmpty()); + final TopicPartition postInitialization = new TopicPartition("other", 0); + expect(active.restoringTaskFor(topicPartition)).andReturn(null); + expect(active.restoringTaskFor(topicPartition)).andReturn(null); + expect(active.restoringTaskFor(postInitialization)).andReturn(null); + replay(active); + + assertTrue(changelogReader.restore(active).isEmpty()); addRecords(9, topicPartition, 1); - final TopicPartition postInitialization = new TopicPartition("other", 0); setupConsumer(3, postInitialization); consumer.updateBeginningOffsets(Collections.singletonMap(postInitialization, 0L)); consumer.updateEndOffsets(Collections.singletonMap(postInitialization, 3L)); @@ -302,24 +326,45 @@ public class StoreChangelogReaderTest { final Collection<TopicPartition> expected = Utils.mkSet(topicPartition, postInitialization); consumer.assign(expected); - assertThat(changelogReader.restore(), equalTo(expected)); + assertThat(changelogReader.restore(active), equalTo(expected)); assertThat(callback.restored.size(), equalTo(10)); assertThat(callbackTwo.restored.size(), equalTo(3)); } - private void setupConsumer(final long messages, final TopicPartition topicPartition) { + @Test + public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestore() { + final int messages = 10; + setupConsumer(messages, topicPartition); + consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 5L)); + changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, + "storeName")); + + expect(active.restoringTaskFor(topicPartition)).andReturn(task); + replay(active); + + try { + changelogReader.restore(active); + fail("Should have thrown TaskMigratedException"); + } catch (final TaskMigratedException expected) { /* ignore */ } + } + + private void setupConsumer(final long messages, + final TopicPartition topicPartition) { assignPartition(messages, topicPartition); addRecords(messages, topicPartition, 0); consumer.assign(Collections.<TopicPartition>emptyList()); } - private void addRecords(final long messages, final TopicPartition topicPartition, final int startingOffset) { + private void addRecords(final long messages, + final TopicPartition topicPartition, + final int startingOffset) { for (int i = 0; i < messages; i++) { consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), startingOffset + i, new byte[0], new byte[0])); } } - private void assignPartition(final long messages, final TopicPartition topicPartition) { + private void assignPartition(final long messages, + final TopicPartition topicPartition) { consumer.updatePartitions(topicPartition.topic(), Collections.singletonList( new PartitionInfo(topicPartition.topic(), http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index fd9b19d..7d04040 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -30,15 +30,16 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.kstream.internals.ConsumedInternal; import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder; import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.TaskMetadata; +import org.apache.kafka.streams.processor.ThreadMetadata; import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.streams.processor.TaskMetadata; -import org.apache.kafka.streams.processor.ThreadMetadata; import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateRestoreListener; @@ -72,6 +73,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class StreamThreadTest { @@ -857,7 +859,10 @@ public class StreamThreadTest { producer.fenceProducer(); mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L); consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, new byte[0], new byte[0])); - thread.runOnce(-1); + try { + thread.runOnce(-1); + fail("Should have thrown TaskMigratedException"); + } catch (final TaskMigratedException expected) { /* ignore */ } TestUtils.waitForCondition( new TestCondition() { @Override @@ -892,7 +897,10 @@ public class StreamThreadTest { thread.rebalanceListener.onPartitionsRevoked(null); clientSupplier.producers.get(0).fenceProducer(); thread.rebalanceListener.onPartitionsAssigned(task0Assignment); - thread.runOnce(-1); + try { + thread.runOnce(-1); + fail("Should have thrown TaskMigratedException"); + } catch (final TaskMigratedException expected) { /* ignore */ } assertTrue(thread.tasks().isEmpty()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 7a87a27..7ee8fae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -316,7 +316,7 @@ public class TaskManagerTest { @Test public void shouldRestoreStateFromChangeLogReader() { - EasyMock.expect(changeLogReader.restore()).andReturn(taskId0Partitions); + EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions); EasyMock.expect(active.updateRestored(taskId0Partitions)). andReturn(Collections.<TopicPartition>emptySet()); @@ -327,7 +327,7 @@ public class TaskManagerTest { @Test public void shouldResumeRestoredPartitions() { - EasyMock.expect(changeLogReader.restore()).andReturn(taskId0Partitions); + EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions); EasyMock.expect(active.updateRestored(taskId0Partitions)). andReturn(taskId0Partitions); http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java deleted file mode 100644 index 54fd858..0000000 --- a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.test; - -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.streams.processor.internals.ChangelogReader; -import org.apache.kafka.streams.processor.internals.StateRestorer; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -public class MockChangelogReader implements ChangelogReader { - private final Set<TopicPartition> registered = new HashSet<>(); - - @Override - public void register(final StateRestorer restorer) { - registered.add(restorer.partition()); - } - - @Override - public Collection<TopicPartition> restore() { - return registered; - } - - @Override - public Map<TopicPartition, Long> restoredOffsets() { - return Collections.emptyMap(); - } - - @Override - public void reset() { - registered.clear(); - } - - public boolean wasRegistered(final TopicPartition partition) { - return registered.contains(partition); - } -}
