This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push: new 4b3bfad KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out (#10072) 4b3bfad is described below commit 4b3bfadbe794824d3b443c1c7b6c7e445e5c75a4 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Fri Feb 19 13:36:07 2021 -0800 KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out (#10072) Part of KIP-572: follow up work to PR #9800. It's not save to retry a TX commit after a timeout, because it's unclear if the commit was successful or not, and thus on retry we might get an IllegalStateException. Instead, we will throw a TaskCorruptedException to retry the TX if the commit failed. Reviewers: A. Sophie Blee-Goldman <sop...@confluent.io> --- docs/streams/upgrade-guide.html | 9 ++ .../streams/errors/TaskCorruptedException.java | 22 ++-- .../streams/errors/TaskTimeoutExceptions.java | 58 ----------- .../processor/internals/ProcessorStateManager.java | 10 +- .../processor/internals/RecordCollectorImpl.java | 22 ++-- .../processor/internals/StoreChangelogReader.java | 9 +- .../streams/processor/internals/StreamTask.java | 2 +- .../streams/processor/internals/StreamThread.java | 4 +- .../streams/processor/internals/TaskManager.java | 114 ++++++++++++--------- .../internals/ProcessorStateManagerTest.java | 4 +- .../processor/internals/StreamThreadTest.java | 20 ++-- .../processor/internals/TaskManagerTest.java | 52 +++++----- 12 files changed, 141 insertions(+), 185 deletions(-) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 2a6a760..38138e1 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -128,6 +128,15 @@ into the constructor, it is no longer required to set mandatory configuration parameters (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-680%3A+TopologyTestDriver+should+not+require+a+Properties+argument">KIP-680</a>). </p> + <p> + Kafka Streams is now handling <code>TimeoutException</code> thrown by the consumer, producer, and admin client. + If a timeout occurs on a task, Kafka Streams moves to the next task and retries to make progress on the failed + task in the next iteration. + To bound how long Kafka Streams retries a task, you can set <code>task.timeout.ms</code> (default is 5 minutes). + If a task does not make progress within the specified task timeout, which is tracked on a per-task basis, + Kafka Streams throws a <code>TimeoutException</code> + (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams">KIP-572</a>). + </p> <h3><a id="streams_api_changes_270" href="#streams_api_changes_270">Streams API changes in 2.7.0</a></h3> <p> diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java index 52f668b..bf5bd17 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java @@ -17,11 +17,9 @@ package org.apache.kafka.streams.errors; import org.apache.kafka.clients.consumer.InvalidOffsetException; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.TaskId; -import java.util.Collection; -import java.util.Map; +import java.util.Set; /** * Indicates a specific task is corrupted and need to be re-initialized. It can be thrown when @@ -33,20 +31,20 @@ import java.util.Map; */ public class TaskCorruptedException extends StreamsException { - private final Map<TaskId, Collection<TopicPartition>> taskWithChangelogs; + private final Set<TaskId> corruptedTasks; - public TaskCorruptedException(final Map<TaskId, Collection<TopicPartition>> taskWithChangelogs) { - super("Tasks with changelogs " + taskWithChangelogs + " are corrupted and hence needs to be re-initialized"); - this.taskWithChangelogs = taskWithChangelogs; + public TaskCorruptedException(final Set<TaskId> corruptedTasks) { + super("Tasks " + corruptedTasks + " are corrupted and hence needs to be re-initialized"); + this.corruptedTasks = corruptedTasks; } - public TaskCorruptedException(final Map<TaskId, Collection<TopicPartition>> taskWithChangelogs, + public TaskCorruptedException(final Set<TaskId> corruptedTasks, final InvalidOffsetException e) { - super("Tasks with changelogs " + taskWithChangelogs + " are corrupted and hence needs to be re-initialized", e); - this.taskWithChangelogs = taskWithChangelogs; + super("Tasks " + corruptedTasks + " are corrupted and hence needs to be re-initialized", e); + this.corruptedTasks = corruptedTasks; } - public Map<TaskId, Collection<TopicPartition>> corruptedTaskWithChangelogs() { - return taskWithChangelogs; + public Set<TaskId> corruptedTasks() { + return corruptedTasks; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskTimeoutExceptions.java b/streams/src/main/java/org/apache/kafka/streams/errors/TaskTimeoutExceptions.java deleted file mode 100644 index 521778d..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/errors/TaskTimeoutExceptions.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.errors; - -import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.streams.processor.internals.Task; - -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - -public class TaskTimeoutExceptions extends StreamsException { - private static final long serialVersionUID = 1L; - - private final TimeoutException timeoutException; - private final Map<Task, TimeoutException> exceptions; - - public TaskTimeoutExceptions() { - super(""); - timeoutException = null; - exceptions = new HashMap<>(); - } - - public TaskTimeoutExceptions(final TimeoutException timeoutException) { - super(""); - this.timeoutException = timeoutException; - exceptions = null; - } - - public void recordException(final Task task, - final TimeoutException timeoutException) { - Objects.requireNonNull(exceptions) - .put(task, timeoutException); - } - - public Map<Task, TimeoutException> exceptions() { - return exceptions; - } - - public TimeoutException timeoutException() { - return timeoutException; - } - -} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index b346006..c455c4b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.ArrayList; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.FixedOrderMap; @@ -25,12 +24,12 @@ import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; -import org.apache.kafka.streams.processor.StateRestoreListener; -import org.apache.kafka.streams.processor.StateStoreContext; -import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.state.internals.CachedStateStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.streams.state.internals.RecordConverter; @@ -39,6 +38,7 @@ import org.slf4j.Logger; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -251,7 +251,7 @@ public class ProcessorStateManager implements StateManager { "treat it as a task corruption error and wipe out the local state of task {} " + "before re-bootstrapping", store.stateStore.name(), taskId); - throw new TaskCorruptedException(Collections.singletonMap(taskId, changelogPartitions())); + throw new TaskCorruptedException(Collections.singleton(taskId)); } else { log.info("State store {} did not find checkpoint offset, hence would " + "default to the starting offset at changelog {}", diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 2de9caa..16a451d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -40,6 +40,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.ProductionExceptionHandler; import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TaskId; @@ -213,27 +214,24 @@ public class RecordCollectorImpl implements RecordCollector { "indicating the task may be migrated out"; sendException.set(new TaskMigratedException(errorMessage, exception)); } else { - // TODO: KIP-572 handle `TimeoutException extends RetriableException` - // is seems inappropriate to pass `TimeoutException` into the `ProductionExceptionHander` - // -> should we add `TimeoutException` as `isFatalException` above (maybe not) ? - // -> maybe we should try to reset the task by throwing a `TaskCorruptedException` (including triggering `task.timeout.ms`) ? if (exception instanceof RetriableException) { errorMessage += "\nThe broker is either slow or in bad state (like not having enough replicas) in responding the request, " + "or the connection to broker was interrupted sending the request or receiving the response. " + "\nConsider overwriting `max.block.ms` and /or " + "`delivery.timeout.ms` to a larger value to wait longer for such scenarios and avoid timeout errors"; - } - - if (productionExceptionHandler.handle(serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) { - errorMessage += "\nException handler choose to FAIL the processing, no more records would be sent."; - sendException.set(new StreamsException(errorMessage, exception)); + sendException.set(new TaskCorruptedException(Collections.singleton(taskId))); } else { - errorMessage += "\nException handler choose to CONTINUE processing in spite of this error but written offsets would not be recorded."; - droppedRecordsSensor.record(); + if (productionExceptionHandler.handle(serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) { + errorMessage += "\nException handler choose to FAIL the processing, no more records would be sent."; + sendException.set(new StreamsException(errorMessage, exception)); + } else { + errorMessage += "\nException handler choose to CONTINUE processing in spite of this error but written offsets would not be recorded."; + droppedRecordsSensor.record(); + } } } - log.error(errorMessage); + log.error(errorMessage, exception); } private boolean isFatalException(final Exception exception) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 56394a2..fdf027f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -442,12 +442,9 @@ public class StoreChangelogReader implements ChangelogReader { "truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" + " it later.", e); - final Map<TaskId, Collection<TopicPartition>> taskWithCorruptedChangelogs = new HashMap<>(); - for (final TopicPartition partition : e.partitions()) { - final TaskId taskId = changelogs.get(partition).stateManager.taskId(); - taskWithCorruptedChangelogs.computeIfAbsent(taskId, k -> new HashSet<>()).add(partition); - } - throw new TaskCorruptedException(taskWithCorruptedChangelogs, e); + final Set<TaskId> corruptedTasks = new HashSet<>(); + e.partitions().forEach(partition -> corruptedTasks.add(changelogs.get(partition).stateManager.taskId())); + throw new TaskCorruptedException(corruptedTasks, e); } catch (final KafkaException e) { throw new StreamsException("Restore consumer get unexpected error polling records.", e); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index ec9db31..02f2d9f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -736,7 +736,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, throw timeoutException; } else { record = null; - throw new TaskCorruptedException(Collections.singletonMap(id, changelogPartitions())); + throw new TaskCorruptedException(Collections.singleton(id)); } } catch (final StreamsException exception) { record = null; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 3dc0c02..b05ee4f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -578,10 +578,10 @@ public class StreamThread extends Thread { nextProbingRebalanceMs.set(Long.MAX_VALUE); } } catch (final TaskCorruptedException e) { - log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " + + log.warn("Detected the states of tasks " + e.corruptedTasks() + " are corrupted. " + "Will close the task as dirty and re-create and bootstrap from scratch.", e); try { - taskManager.handleCorruption(e.corruptedTaskWithChangelogs()); + taskManager.handleCorruption(e.corruptedTasks()); } catch (final TaskMigratedException taskMigrated) { handleTaskMigrated(taskMigrated); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 3ca6876..02fc90c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -32,9 +32,9 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskIdFormatException; import org.apache.kafka.streams.errors.TaskMigratedException; -import org.apache.kafka.streams.errors.TaskTimeoutExceptions; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.Task.State; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -155,17 +155,16 @@ public class TaskManager { /** * @throws TaskMigratedException */ - void handleCorruption(final Map<TaskId, Collection<TopicPartition>> tasksWithChangelogs) { + void handleCorruption(final Set<TaskId> corruptedTasks) { final Map<Task, Collection<TopicPartition>> corruptedStandbyTasks = new HashMap<>(); final Map<Task, Collection<TopicPartition>> corruptedActiveTasks = new HashMap<>(); - for (final Map.Entry<TaskId, Collection<TopicPartition>> taskEntry : tasksWithChangelogs.entrySet()) { - final TaskId taskId = taskEntry.getKey(); + for (final TaskId taskId : corruptedTasks) { final Task task = tasks.task(taskId); if (task.isActive()) { - corruptedActiveTasks.put(task, taskEntry.getValue()); + corruptedActiveTasks.put(task, task.changelogPartitions()); } else { - corruptedStandbyTasks.put(task, taskEntry.getValue()); + corruptedStandbyTasks.put(task, task.changelogPartitions()); } } @@ -177,7 +176,7 @@ public class TaskManager { .values() .stream() .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) - .filter(t -> !tasksWithChangelogs.containsKey(t.id())) + .filter(t -> !corruptedTasks.contains(t.id())) .collect(Collectors.toSet()) ); @@ -529,11 +528,6 @@ public class TaskManager { // so we would capture any exception and throw try { commitOffsetsOrTransaction(consumedOffsetsPerTask); - } catch (final TaskTimeoutExceptions taskTimeoutExceptions) { - for (final Map.Entry<Task, TimeoutException> timeoutException : taskTimeoutExceptions.exceptions().entrySet()) { - log.error("Exception caught while committing revoked task " + timeoutException.getKey(), timeoutException.getValue()); - } - firstException.compareAndSet(null, taskTimeoutExceptions); } catch (final RuntimeException e) { log.error("Exception caught while committing those revoked tasks " + revokedActiveTasks, e); firstException.compareAndSet(null, e); @@ -841,7 +835,7 @@ public class TaskManager { } // If any active tasks can't be committed, none of them can be, and all that need a commit must be closed dirty - if (!tasksToCloseDirty.isEmpty()) { + if (processingMode == EXACTLY_ONCE_BETA && !tasksToCloseDirty.isEmpty()) { tasksToCloseClean.removeAll(tasksToCommit); tasksToCloseDirty.addAll(tasksToCommit); } else { @@ -858,15 +852,22 @@ public class TaskManager { tasksToCloseClean.remove(task); } } - } catch (final TaskTimeoutExceptions taskTimeoutExceptions) { - for (final Map.Entry<Task, TimeoutException> timeoutException : taskTimeoutExceptions.exceptions().entrySet()) { - log.error( - "Exception caught while committing task {} during shutdown {}", - timeoutException.getKey(), - timeoutException.getValue() - ); - } - firstException.compareAndSet(null, taskTimeoutExceptions); + } catch (final TimeoutException timeoutException) { + firstException.compareAndSet(null, timeoutException); + + tasksToCloseClean.removeAll(tasksToCommit); + tasksToCloseDirty.addAll(tasksToCommit); + } catch (final TaskCorruptedException taskCorruptedException) { + firstException.compareAndSet(null, taskCorruptedException); + + final Set<TaskId> corruptedTaskIds = taskCorruptedException.corruptedTasks(); + final Set<Task> corruptedTasks = tasksToCommit + .stream() + .filter(task -> corruptedTaskIds.contains(task.id())) + .collect(Collectors.toSet()); + + tasksToCloseClean.removeAll(corruptedTasks); + tasksToCloseDirty.addAll(corruptedTasks); } catch (final RuntimeException e) { log.error("Exception caught while committing tasks during shutdown", e); firstException.compareAndSet(null, e); @@ -1003,32 +1004,22 @@ public class TaskManager { } } - final Set<Task> uncommittedTasks = new HashSet<>(); try { commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); - } catch (final TaskTimeoutExceptions taskTimeoutExceptions) { - final TimeoutException timeoutException = taskTimeoutExceptions.timeoutException(); - if (timeoutException != null) { - consumedOffsetsAndMetadataPerTask - .keySet() - .forEach(t -> t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException)); - uncommittedTasks.addAll(tasksToCommit); - } else { - for (final Map.Entry<Task, TimeoutException> timeoutExceptions : taskTimeoutExceptions.exceptions().entrySet()) { - final Task task = timeoutExceptions.getKey(); - task.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutExceptions.getValue()); - uncommittedTasks.add(task); + + for (final Task task : tasksToCommit) { + if (task.commitNeeded()) { + task.clearTaskTimeout(); + ++committed; + task.postCommit(false); } } + } catch (final TimeoutException timeoutException) { + consumedOffsetsAndMetadataPerTask + .keySet() + .forEach(t -> t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException)); } - for (final Task task : tasksToCommit) { - if (task.commitNeeded() && !uncommittedTasks.contains(task)) { - task.clearTaskTimeout(); - ++committed; - task.postCommit(false); - } - } return committed; } @@ -1054,7 +1045,7 @@ public class TaskManager { private void commitOffsetsOrTransaction(final Map<Task, Map<TopicPartition, OffsetAndMetadata>> offsetsPerTask) { log.debug("Committing task offsets {}", offsetsPerTask.entrySet().stream().collect(Collectors.toMap(t -> t.getKey().id(), Entry::getValue))); // avoid logging actual Task objects - TaskTimeoutExceptions timeoutExceptions = null; + final Set<TaskId> corruptedTasks = new HashSet<>(); if (!offsetsPerTask.isEmpty()) { if (processingMode == EXACTLY_ONCE_ALPHA) { @@ -1064,10 +1055,11 @@ public class TaskManager { tasks.streamsProducerForTask(task.id()) .commitTransaction(taskToCommit.getValue(), mainConsumer.groupMetadata()); } catch (final TimeoutException timeoutException) { - if (timeoutExceptions == null) { - timeoutExceptions = new TaskTimeoutExceptions(); - } - timeoutExceptions.recordException(task, timeoutException); + log.error( + String.format("Committing task %s failed.", task.id()), + timeoutException + ); + corruptedTasks.add(task.id()); } } } else { @@ -1078,7 +1070,18 @@ public class TaskManager { try { tasks.threadProducer().commitTransaction(allOffsets, mainConsumer.groupMetadata()); } catch (final TimeoutException timeoutException) { - throw new TaskTimeoutExceptions(timeoutException); + log.error( + String.format("Committing task(s) %s failed.", + offsetsPerTask + .keySet() + .stream() + .map(t -> t.id().toString()) + .collect(Collectors.joining(", "))), + timeoutException + ); + offsetsPerTask + .keySet() + .forEach(task -> corruptedTasks.add(task.id())); } } else { try { @@ -1087,15 +1090,24 @@ public class TaskManager { throw new TaskMigratedException("Consumer committing offsets failed, " + "indicating the corresponding thread is no longer part of the group", error); } catch (final TimeoutException timeoutException) { - throw new TaskTimeoutExceptions(timeoutException); + log.error( + String.format("Committing task(s) %s failed.", + offsetsPerTask + .keySet() + .stream() + .map(t -> t.id().toString()) + .collect(Collectors.joining(", "))), + timeoutException + ); + throw timeoutException; } catch (final KafkaException error) { throw new StreamsException("Error encountered committing offsets via consumer", error); } } } - if (timeoutExceptions != null) { - throw timeoutExceptions; + if (!corruptedTasks.isEmpty()) { + throw new TaskCorruptedException(corruptedTasks); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 44609a2..39788e8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -904,8 +904,8 @@ public class ProcessorStateManagerTest { () -> stateMgr.initializeStoreOffsetsFromCheckpoint(false)); assertEquals( - Collections.singletonMap(taskId, stateMgr.changelogPartitions()), - exception.corruptedTaskWithChangelogs() + Collections.singleton(taskId), + exception.corruptedTasks() ); } finally { stateMgr.close(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index e4d083b..bfc32d5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -1422,7 +1422,7 @@ public class StreamThreadTest { "proc", () -> record -> { if (shouldThrow.get()) { - throw new TaskCorruptedException(singletonMap(task1, new HashSet<>(singleton(storeChangelogTopicPartition)))); + throw new TaskCorruptedException(singleton(task1)); } else { processed.set(true); } @@ -1479,7 +1479,7 @@ public class StreamThreadTest { final TaskCorruptedException taskCorruptedException = assertThrows(TaskCorruptedException.class, thread::runOnce); // Now, we can handle the corruption - thread.taskManager().handleCorruption(taskCorruptedException.corruptedTaskWithChangelogs()); + thread.taskManager().handleCorruption(taskCorruptedException.corruptedTasks()); // again, complete the restoration thread.runOnce(); @@ -2261,16 +2261,14 @@ public class StreamThreadTest { final TaskId taskId1 = new TaskId(0, 0); final TaskId taskId2 = new TaskId(0, 2); - final Map<TaskId, Collection<TopicPartition>> corruptedTasksWithChangelogs = mkMap( - mkEntry(taskId1, emptySet()) - ); + final Set<TaskId> corruptedTasks = singleton(taskId1); expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes(); expect(task1.id()).andReturn(taskId1).anyTimes(); expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes(); expect(task2.id()).andReturn(taskId2).anyTimes(); - taskManager.handleCorruption(corruptedTasksWithChangelogs); + taskManager.handleCorruption(corruptedTasks); EasyMock.replay(task1, task2, taskManager, consumer); @@ -2298,7 +2296,7 @@ public class StreamThreadTest { @Override void runOnce() { setState(State.PENDING_SHUTDOWN); - throw new TaskCorruptedException(corruptedTasksWithChangelogs); + throw new TaskCorruptedException(corruptedTasks); } }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); @@ -2326,16 +2324,14 @@ public class StreamThreadTest { final TaskId taskId1 = new TaskId(0, 0); final TaskId taskId2 = new TaskId(0, 2); - final Map<TaskId, Collection<TopicPartition>> corruptedTasksWithChangelogs = mkMap( - mkEntry(taskId1, emptySet()) - ); + final Set<TaskId> corruptedTasks = singleton(taskId1); expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes(); expect(task1.id()).andReturn(taskId1).anyTimes(); expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes(); expect(task2.id()).andReturn(taskId2).anyTimes(); - taskManager.handleCorruption(corruptedTasksWithChangelogs); + taskManager.handleCorruption(corruptedTasks); expectLastCall().andThrow(new TaskMigratedException("Task migrated", new RuntimeException("non-corrupted task migrated"))); @@ -2368,7 +2364,7 @@ public class StreamThreadTest { @Override void runOnce() { setState(State.PENDING_SHUTDOWN); - throw new TaskCorruptedException(corruptedTasksWithChangelogs); + throw new TaskCorruptedException(corruptedTasks); } }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 6e6a848..501dbe0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -39,6 +39,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; @@ -653,7 +654,8 @@ public class TaskManagerTest { assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true)); assertThat(task00.state(), is(Task.State.RUNNING)); - taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions)); + task00.setChangelogOffsets(singletonMap(t1p0, 0L)); + taskManager.handleCorruption(singleton(taskId00)); assertThat(task00.commitPrepared, is(true)); assertThat(task00.state(), is(Task.State.CREATED)); @@ -697,7 +699,8 @@ public class TaskManagerTest { assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true)); assertThat(task00.state(), is(Task.State.RUNNING)); - taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions)); + task00.setChangelogOffsets(singletonMap(t1p0, 0L)); + taskManager.handleCorruption(singleton(taskId00)); assertThat(task00.commitPrepared, is(true)); assertThat(task00.state(), is(Task.State.CREATED)); assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00))); @@ -742,7 +745,8 @@ public class TaskManagerTest { assertThat(nonCorruptedTask.state(), is(Task.State.RUNNING)); nonCorruptedTask.setCommitNeeded(); - taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions)); + corruptedTask.setChangelogOffsets(singletonMap(t1p0, 0L)); + taskManager.handleCorruption(singleton(taskId00)); assertTrue(nonCorruptedTask.commitPrepared); verify(consumer); @@ -781,7 +785,8 @@ public class TaskManagerTest { taskManager.handleAssignment(assignment, emptyMap()); assertThat(nonRunningNonCorruptedTask.state(), is(Task.State.CREATED)); - taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions)); + corruptedTask.setChangelogOffsets(singletonMap(t1p0, 0L)); + taskManager.handleCorruption(singleton(taskId00)); verify(activeTaskCreator); assertFalse(nonRunningNonCorruptedTask.commitPrepared); @@ -821,7 +826,8 @@ public class TaskManagerTest { runningNonCorruptedActive.setCommitNeeded(); - assertThrows(TaskMigratedException.class, () -> taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions))); + corruptedStandby.setChangelogOffsets(singletonMap(t1p0, 0L)); + assertThrows(TaskMigratedException.class, () -> taskManager.handleCorruption(singleton(taskId00))); assertThat(corruptedStandby.commitPrepared, is(true)); @@ -2717,19 +2723,18 @@ public class TaskManagerTest { task00.setCommitNeeded(); task01.setCommitNeeded(); - assertThat(taskManager.commit(mkSet(task00, task01, task02)), equalTo(1)); - assertThat(task00.timeout, equalTo(time.milliseconds())); - assertNull(task01.timeout); - assertNull(task02.timeout); - - assertThat(taskManager.commit(mkSet(task00, task01, task02)), equalTo(1)); - assertNull(task00.timeout); - assertNull(task01.timeout); - assertNull(task02.timeout); + final TaskCorruptedException exception = assertThrows( + TaskCorruptedException.class, + () -> taskManager.commit(mkSet(task00, task01, task02)) + ); + assertThat( + exception.corruptedTasks(), + equalTo(Collections.singleton(taskId00)) + ); } @Test - public void shouldNotFailForTimeoutExceptionOnCommitWithEosBeta() { + public void shouldThrowTaskCorruptedExceptionForTimeoutExceptionOnCommitWithEosBeta() { setUpTaskManager(ProcessingMode.EXACTLY_ONCE_BETA); final StreamsProducer producer = mock(StreamsProducer.class); @@ -2759,15 +2764,14 @@ public class TaskManagerTest { task00.setCommitNeeded(); task01.setCommitNeeded(); - assertThat(taskManager.commit(mkSet(task00, task01, task02)), equalTo(0)); - assertThat(task00.timeout, equalTo(time.milliseconds())); - assertThat(task01.timeout, equalTo(time.milliseconds())); - assertNull(task02.timeout); - - assertThat(taskManager.commit(mkSet(task00, task01, task02)), equalTo(2)); - assertNull(task00.timeout); - assertNull(task01.timeout); - assertNull(task02.timeout); + final TaskCorruptedException exception = assertThrows( + TaskCorruptedException.class, + () -> taskManager.commit(mkSet(task00, task01, task02)) + ); + assertThat( + exception.corruptedTasks(), + equalTo(mkSet(taskId00, taskId01)) + ); } @Test