This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 4b3bfad  KAFKA-9274: Throw TaskCorruptedException instead of 
TimeoutException when TX commit times out (#10072)
4b3bfad is described below

commit 4b3bfadbe794824d3b443c1c7b6c7e445e5c75a4
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Fri Feb 19 13:36:07 2021 -0800

    KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when 
TX commit times out (#10072)
    
    Part of KIP-572: follow up work to PR #9800. It's not save to retry a TX 
commit after a timeout, because it's unclear if the commit was successful or 
not, and thus on retry we might get an IllegalStateException. Instead, we will 
throw a TaskCorruptedException to retry the TX if the commit failed.
    
    Reviewers: A. Sophie Blee-Goldman <sop...@confluent.io>
---
 docs/streams/upgrade-guide.html                    |   9 ++
 .../streams/errors/TaskCorruptedException.java     |  22 ++--
 .../streams/errors/TaskTimeoutExceptions.java      |  58 -----------
 .../processor/internals/ProcessorStateManager.java |  10 +-
 .../processor/internals/RecordCollectorImpl.java   |  22 ++--
 .../processor/internals/StoreChangelogReader.java  |   9 +-
 .../streams/processor/internals/StreamTask.java    |   2 +-
 .../streams/processor/internals/StreamThread.java  |   4 +-
 .../streams/processor/internals/TaskManager.java   | 114 ++++++++++++---------
 .../internals/ProcessorStateManagerTest.java       |   4 +-
 .../processor/internals/StreamThreadTest.java      |  20 ++--
 .../processor/internals/TaskManagerTest.java       |  52 +++++-----
 12 files changed, 141 insertions(+), 185 deletions(-)

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 2a6a760..38138e1 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -128,6 +128,15 @@
         into the constructor, it is no longer required to set mandatory 
configuration parameters
         (cf. <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-680%3A+TopologyTestDriver+should+not+require+a+Properties+argument";>KIP-680</a>).
     </p>
+    <p>
+        Kafka Streams is now handling <code>TimeoutException</code> thrown by 
the consumer, producer, and admin client.
+        If a timeout occurs on a task, Kafka Streams moves to the next task 
and retries to make progress on the failed
+        task in the next iteration.
+        To bound how long Kafka Streams retries a task, you can set 
<code>task.timeout.ms</code> (default is 5 minutes).
+        If a task does not make progress within the specified task timeout, 
which is tracked on a per-task basis,
+        Kafka Streams throws a <code>TimeoutException</code>
+        (cf. <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams";>KIP-572</a>).
+    </p>
 
     <h3><a id="streams_api_changes_270" 
href="#streams_api_changes_270">Streams API changes in 2.7.0</a></h3>
     <p>
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
index 52f668b..bf5bd17 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
@@ -17,11 +17,9 @@
 package org.apache.kafka.streams.errors;
 
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.TaskId;
 
-import java.util.Collection;
-import java.util.Map;
+import java.util.Set;
 
 /**
  * Indicates a specific task is corrupted and need to be re-initialized. It 
can be thrown when
@@ -33,20 +31,20 @@ import java.util.Map;
  */
 public class TaskCorruptedException extends StreamsException {
 
-    private final Map<TaskId, Collection<TopicPartition>> taskWithChangelogs;
+    private final Set<TaskId> corruptedTasks;
 
-    public TaskCorruptedException(final Map<TaskId, 
Collection<TopicPartition>> taskWithChangelogs) {
-        super("Tasks with changelogs " + taskWithChangelogs + " are corrupted 
and hence needs to be re-initialized");
-        this.taskWithChangelogs = taskWithChangelogs;
+    public TaskCorruptedException(final Set<TaskId> corruptedTasks) {
+        super("Tasks " + corruptedTasks + " are corrupted and hence needs to 
be re-initialized");
+        this.corruptedTasks = corruptedTasks;
     }
 
-    public TaskCorruptedException(final Map<TaskId, 
Collection<TopicPartition>> taskWithChangelogs,
+    public TaskCorruptedException(final Set<TaskId> corruptedTasks,
                                   final InvalidOffsetException e) {
-        super("Tasks with changelogs " + taskWithChangelogs + " are corrupted 
and hence needs to be re-initialized", e);
-        this.taskWithChangelogs = taskWithChangelogs;
+        super("Tasks " + corruptedTasks + " are corrupted and hence needs to 
be re-initialized", e);
+        this.corruptedTasks = corruptedTasks;
     }
 
-    public Map<TaskId, Collection<TopicPartition>> 
corruptedTaskWithChangelogs() {
-        return taskWithChangelogs;
+    public Set<TaskId> corruptedTasks() {
+        return corruptedTasks;
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskTimeoutExceptions.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskTimeoutExceptions.java
deleted file mode 100644
index 521778d..0000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskTimeoutExceptions.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.errors;
-
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.streams.processor.internals.Task;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-
-public class TaskTimeoutExceptions extends StreamsException {
-    private static final long serialVersionUID = 1L;
-
-    private final TimeoutException timeoutException;
-    private final Map<Task, TimeoutException> exceptions;
-
-    public TaskTimeoutExceptions() {
-        super("");
-        timeoutException = null;
-        exceptions = new HashMap<>();
-    }
-
-    public TaskTimeoutExceptions(final TimeoutException timeoutException) {
-        super("");
-        this.timeoutException = timeoutException;
-        exceptions = null;
-    }
-
-    public void recordException(final Task task,
-                                final TimeoutException timeoutException) {
-        Objects.requireNonNull(exceptions)
-            .put(task, timeoutException);
-    }
-
-    public Map<Task, TimeoutException> exceptions() {
-        return exceptions;
-    }
-
-    public TimeoutException timeoutException() {
-        return timeoutException;
-    }
-
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index b346006..c455c4b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.ArrayList;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.FixedOrderMap;
@@ -25,12 +24,12 @@ import 
org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
-import org.apache.kafka.streams.processor.StateRestoreListener;
-import org.apache.kafka.streams.processor.StateStoreContext;
-import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.state.internals.CachedStateStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.RecordConverter;
@@ -39,6 +38,7 @@ import org.slf4j.Logger;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -251,7 +251,7 @@ public class ProcessorStateManager implements StateManager {
                                 "treat it as a task corruption error and wipe 
out the local state of task {} " +
                                 "before re-bootstrapping", 
store.stateStore.name(), taskId);
 
-                            throw new 
TaskCorruptedException(Collections.singletonMap(taskId, changelogPartitions()));
+                            throw new 
TaskCorruptedException(Collections.singleton(taskId));
                         } else {
                             log.info("State store {} did not find checkpoint 
offset, hence would " +
                                 "default to the starting offset at changelog 
{}",
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 2de9caa..16a451d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.ProductionExceptionHandler;
 import 
org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
@@ -213,27 +214,24 @@ public class RecordCollectorImpl implements 
RecordCollector {
                 "indicating the task may be migrated out";
             sendException.set(new TaskMigratedException(errorMessage, 
exception));
         } else {
-            // TODO: KIP-572 handle `TimeoutException extends 
RetriableException`
-            // is seems inappropriate to pass `TimeoutException` into the 
`ProductionExceptionHander`
-            // -> should we add `TimeoutException` as `isFatalException` above 
(maybe not) ?
-            // -> maybe we should try to reset the task by throwing a 
`TaskCorruptedException` (including triggering `task.timeout.ms`) ?
             if (exception instanceof RetriableException) {
                 errorMessage += "\nThe broker is either slow or in bad state 
(like not having enough replicas) in responding the request, " +
                     "or the connection to broker was interrupted sending the 
request or receiving the response. " +
                     "\nConsider overwriting `max.block.ms` and /or " +
                     "`delivery.timeout.ms` to a larger value to wait longer 
for such scenarios and avoid timeout errors";
-            }
-
-            if (productionExceptionHandler.handle(serializedRecord, exception) 
== ProductionExceptionHandlerResponse.FAIL) {
-                errorMessage += "\nException handler choose to FAIL the 
processing, no more records would be sent.";
-                sendException.set(new StreamsException(errorMessage, 
exception));
+                sendException.set(new 
TaskCorruptedException(Collections.singleton(taskId)));
             } else {
-                errorMessage += "\nException handler choose to CONTINUE 
processing in spite of this error but written offsets would not be recorded.";
-                droppedRecordsSensor.record();
+                if (productionExceptionHandler.handle(serializedRecord, 
exception) == ProductionExceptionHandlerResponse.FAIL) {
+                    errorMessage += "\nException handler choose to FAIL the 
processing, no more records would be sent.";
+                    sendException.set(new StreamsException(errorMessage, 
exception));
+                } else {
+                    errorMessage += "\nException handler choose to CONTINUE 
processing in spite of this error but written offsets would not be recorded.";
+                    droppedRecordsSensor.record();
+                }
             }
         }
 
-        log.error(errorMessage);
+        log.error(errorMessage, exception);
     }
 
     private boolean isFatalException(final Exception exception) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 56394a2..fdf027f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -442,12 +442,9 @@ public class StoreChangelogReader implements 
ChangelogReader {
                     "truncated or compacted on the broker, marking the 
corresponding tasks as corrupted and re-initializing" +
                     " it later.", e);
 
-                final Map<TaskId, Collection<TopicPartition>> 
taskWithCorruptedChangelogs = new HashMap<>();
-                for (final TopicPartition partition : e.partitions()) {
-                    final TaskId taskId = 
changelogs.get(partition).stateManager.taskId();
-                    taskWithCorruptedChangelogs.computeIfAbsent(taskId, k -> 
new HashSet<>()).add(partition);
-                }
-                throw new TaskCorruptedException(taskWithCorruptedChangelogs, 
e);
+                final Set<TaskId> corruptedTasks = new HashSet<>();
+                e.partitions().forEach(partition -> 
corruptedTasks.add(changelogs.get(partition).stateManager.taskId()));
+                throw new TaskCorruptedException(corruptedTasks, e);
             } catch (final KafkaException e) {
                 throw new StreamsException("Restore consumer get unexpected 
error polling records.", e);
             }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index ec9db31..02f2d9f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -736,7 +736,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                 throw timeoutException;
             } else {
                 record = null;
-                throw new TaskCorruptedException(Collections.singletonMap(id, 
changelogPartitions()));
+                throw new TaskCorruptedException(Collections.singleton(id));
             }
         } catch (final StreamsException exception) {
             record = null;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 3dc0c02..b05ee4f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -578,10 +578,10 @@ public class StreamThread extends Thread {
                     nextProbingRebalanceMs.set(Long.MAX_VALUE);
                 }
             } catch (final TaskCorruptedException e) {
-                log.warn("Detected the states of tasks " + 
e.corruptedTaskWithChangelogs() + " are corrupted. " +
+                log.warn("Detected the states of tasks " + e.corruptedTasks() 
+ " are corrupted. " +
                         "Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
                 try {
-                    
taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
+                    taskManager.handleCorruption(e.corruptedTasks());
                 } catch (final TaskMigratedException taskMigrated) {
                     handleTaskMigrated(taskMigrated);
                 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 3ca6876..02fc90c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -32,9 +32,9 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskIdFormatException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
-import org.apache.kafka.streams.errors.TaskTimeoutExceptions;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.Task.State;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -155,17 +155,16 @@ public class TaskManager {
     /**
      * @throws TaskMigratedException
      */
-    void handleCorruption(final Map<TaskId, Collection<TopicPartition>> 
tasksWithChangelogs) {
+    void handleCorruption(final Set<TaskId> corruptedTasks) {
         final Map<Task, Collection<TopicPartition>> corruptedStandbyTasks = 
new HashMap<>();
         final Map<Task, Collection<TopicPartition>> corruptedActiveTasks = new 
HashMap<>();
 
-        for (final Map.Entry<TaskId, Collection<TopicPartition>> taskEntry : 
tasksWithChangelogs.entrySet()) {
-            final TaskId taskId = taskEntry.getKey();
+        for (final TaskId taskId : corruptedTasks) {
             final Task task = tasks.task(taskId);
             if (task.isActive()) {
-                corruptedActiveTasks.put(task, taskEntry.getValue());
+                corruptedActiveTasks.put(task, task.changelogPartitions());
             } else {
-                corruptedStandbyTasks.put(task, taskEntry.getValue());
+                corruptedStandbyTasks.put(task, task.changelogPartitions());
             }
         }
 
@@ -177,7 +176,7 @@ public class TaskManager {
                    .values()
                    .stream()
                    .filter(t -> t.state() == Task.State.RUNNING || t.state() 
== Task.State.RESTORING)
-                   .filter(t -> !tasksWithChangelogs.containsKey(t.id()))
+                   .filter(t -> !corruptedTasks.contains(t.id()))
                    .collect(Collectors.toSet())
         );
 
@@ -529,11 +528,6 @@ public class TaskManager {
         // so we would capture any exception and throw
         try {
             commitOffsetsOrTransaction(consumedOffsetsPerTask);
-        } catch (final TaskTimeoutExceptions taskTimeoutExceptions) {
-            for (final Map.Entry<Task, TimeoutException> timeoutException : 
taskTimeoutExceptions.exceptions().entrySet()) {
-                log.error("Exception caught while committing revoked task " + 
timeoutException.getKey(), timeoutException.getValue());
-            }
-            firstException.compareAndSet(null, taskTimeoutExceptions);
         } catch (final RuntimeException e) {
             log.error("Exception caught while committing those revoked tasks " 
+ revokedActiveTasks, e);
             firstException.compareAndSet(null, e);
@@ -841,7 +835,7 @@ public class TaskManager {
         }
 
         // If any active tasks can't be committed, none of them can be, and 
all that need a commit must be closed dirty
-        if (!tasksToCloseDirty.isEmpty()) {
+        if (processingMode == EXACTLY_ONCE_BETA && 
!tasksToCloseDirty.isEmpty()) {
             tasksToCloseClean.removeAll(tasksToCommit);
             tasksToCloseDirty.addAll(tasksToCommit);
         } else {
@@ -858,15 +852,22 @@ public class TaskManager {
                         tasksToCloseClean.remove(task);
                     }
                 }
-            } catch (final TaskTimeoutExceptions taskTimeoutExceptions) {
-                for (final Map.Entry<Task, TimeoutException> timeoutException 
: taskTimeoutExceptions.exceptions().entrySet()) {
-                    log.error(
-                        "Exception caught while committing task {} during 
shutdown {}",
-                        timeoutException.getKey(),
-                        timeoutException.getValue()
-                    );
-                }
-                firstException.compareAndSet(null, taskTimeoutExceptions);
+            } catch (final TimeoutException timeoutException) {
+                firstException.compareAndSet(null, timeoutException);
+
+                tasksToCloseClean.removeAll(tasksToCommit);
+                tasksToCloseDirty.addAll(tasksToCommit);
+            } catch (final TaskCorruptedException taskCorruptedException) {
+                firstException.compareAndSet(null, taskCorruptedException);
+
+                final Set<TaskId> corruptedTaskIds = 
taskCorruptedException.corruptedTasks();
+                final Set<Task> corruptedTasks = tasksToCommit
+                        .stream()
+                        .filter(task -> corruptedTaskIds.contains(task.id()))
+                        .collect(Collectors.toSet());
+
+                tasksToCloseClean.removeAll(corruptedTasks);
+                tasksToCloseDirty.addAll(corruptedTasks);
             } catch (final RuntimeException e) {
                 log.error("Exception caught while committing tasks during 
shutdown", e);
                 firstException.compareAndSet(null, e);
@@ -1003,32 +1004,22 @@ public class TaskManager {
                 }
             }
 
-            final Set<Task> uncommittedTasks = new HashSet<>();
             try {
                 commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-            } catch (final TaskTimeoutExceptions taskTimeoutExceptions) {
-                final TimeoutException timeoutException = 
taskTimeoutExceptions.timeoutException();
-                if (timeoutException != null) {
-                    consumedOffsetsAndMetadataPerTask
-                        .keySet()
-                        .forEach(t -> 
t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
-                    uncommittedTasks.addAll(tasksToCommit);
-                } else {
-                    for (final Map.Entry<Task, TimeoutException> 
timeoutExceptions : taskTimeoutExceptions.exceptions().entrySet()) {
-                        final Task task = timeoutExceptions.getKey();
-                        task.maybeInitTaskTimeoutOrThrow(time.milliseconds(), 
timeoutExceptions.getValue());
-                        uncommittedTasks.add(task);
+
+                for (final Task task : tasksToCommit) {
+                    if (task.commitNeeded()) {
+                        task.clearTaskTimeout();
+                        ++committed;
+                        task.postCommit(false);
                     }
                 }
+            } catch (final TimeoutException timeoutException) {
+                consumedOffsetsAndMetadataPerTask
+                    .keySet()
+                    .forEach(t -> 
t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
             }
 
-            for (final Task task : tasksToCommit) {
-                if (task.commitNeeded() && !uncommittedTasks.contains(task)) {
-                    task.clearTaskTimeout();
-                    ++committed;
-                    task.postCommit(false);
-                }
-            }
 
             return committed;
         }
@@ -1054,7 +1045,7 @@ public class TaskManager {
     private void commitOffsetsOrTransaction(final Map<Task, 
Map<TopicPartition, OffsetAndMetadata>> offsetsPerTask) {
         log.debug("Committing task offsets {}", 
offsetsPerTask.entrySet().stream().collect(Collectors.toMap(t -> 
t.getKey().id(), Entry::getValue))); // avoid logging actual Task objects
 
-        TaskTimeoutExceptions timeoutExceptions = null;
+        final Set<TaskId> corruptedTasks = new HashSet<>();
 
         if (!offsetsPerTask.isEmpty()) {
             if (processingMode == EXACTLY_ONCE_ALPHA) {
@@ -1064,10 +1055,11 @@ public class TaskManager {
                         tasks.streamsProducerForTask(task.id())
                             .commitTransaction(taskToCommit.getValue(), 
mainConsumer.groupMetadata());
                     } catch (final TimeoutException timeoutException) {
-                        if (timeoutExceptions == null) {
-                            timeoutExceptions = new TaskTimeoutExceptions();
-                        }
-                        timeoutExceptions.recordException(task, 
timeoutException);
+                        log.error(
+                            String.format("Committing task %s failed.", 
task.id()),
+                            timeoutException
+                        );
+                        corruptedTasks.add(task.id());
                     }
                 }
             } else {
@@ -1078,7 +1070,18 @@ public class TaskManager {
                     try {
                         tasks.threadProducer().commitTransaction(allOffsets, 
mainConsumer.groupMetadata());
                     } catch (final TimeoutException timeoutException) {
-                        throw new TaskTimeoutExceptions(timeoutException);
+                        log.error(
+                            String.format("Committing task(s) %s failed.",
+                                offsetsPerTask
+                                    .keySet()
+                                    .stream()
+                                    .map(t -> t.id().toString())
+                                    .collect(Collectors.joining(", "))),
+                            timeoutException
+                        );
+                        offsetsPerTask
+                            .keySet()
+                            .forEach(task -> corruptedTasks.add(task.id()));
                     }
                 } else {
                     try {
@@ -1087,15 +1090,24 @@ public class TaskManager {
                         throw new TaskMigratedException("Consumer committing 
offsets failed, " +
                                                             "indicating the 
corresponding thread is no longer part of the group", error);
                     } catch (final TimeoutException timeoutException) {
-                        throw new TaskTimeoutExceptions(timeoutException);
+                        log.error(
+                            String.format("Committing task(s) %s failed.",
+                                offsetsPerTask
+                                    .keySet()
+                                    .stream()
+                                    .map(t -> t.id().toString())
+                                    .collect(Collectors.joining(", "))),
+                            timeoutException
+                        );
+                        throw timeoutException;
                     } catch (final KafkaException error) {
                         throw new StreamsException("Error encountered 
committing offsets via consumer", error);
                     }
                 }
             }
 
-            if (timeoutExceptions != null) {
-                throw timeoutExceptions;
+            if (!corruptedTasks.isEmpty()) {
+                throw new TaskCorruptedException(corruptedTasks);
             }
         }
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 44609a2..39788e8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -904,8 +904,8 @@ public class ProcessorStateManagerTest {
                 () -> stateMgr.initializeStoreOffsetsFromCheckpoint(false));
 
             assertEquals(
-                Collections.singletonMap(taskId, 
stateMgr.changelogPartitions()),
-                exception.corruptedTaskWithChangelogs()
+                Collections.singleton(taskId),
+                exception.corruptedTasks()
             );
         } finally {
             stateMgr.close();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index e4d083b..bfc32d5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -1422,7 +1422,7 @@ public class StreamThreadTest {
             "proc",
             () -> record -> {
                 if (shouldThrow.get()) {
-                    throw new TaskCorruptedException(singletonMap(task1, new 
HashSet<>(singleton(storeChangelogTopicPartition))));
+                    throw new TaskCorruptedException(singleton(task1));
                 } else {
                     processed.set(true);
                 }
@@ -1479,7 +1479,7 @@ public class StreamThreadTest {
         final TaskCorruptedException taskCorruptedException = 
assertThrows(TaskCorruptedException.class, thread::runOnce);
 
         // Now, we can handle the corruption
-        
thread.taskManager().handleCorruption(taskCorruptedException.corruptedTaskWithChangelogs());
+        
thread.taskManager().handleCorruption(taskCorruptedException.corruptedTasks());
 
         // again, complete the restoration
         thread.runOnce();
@@ -2261,16 +2261,14 @@ public class StreamThreadTest {
         final TaskId taskId1 = new TaskId(0, 0);
         final TaskId taskId2 = new TaskId(0, 2);
 
-        final Map<TaskId, Collection<TopicPartition>> 
corruptedTasksWithChangelogs = mkMap(
-            mkEntry(taskId1, emptySet())
-        );
+        final Set<TaskId> corruptedTasks = singleton(taskId1);
 
         expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes();
         expect(task1.id()).andReturn(taskId1).anyTimes();
         expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes();
         expect(task2.id()).andReturn(taskId2).anyTimes();
 
-        taskManager.handleCorruption(corruptedTasksWithChangelogs);
+        taskManager.handleCorruption(corruptedTasks);
 
         EasyMock.replay(task1, task2, taskManager, consumer);
 
@@ -2298,7 +2296,7 @@ public class StreamThreadTest {
             @Override
             void runOnce() {
                 setState(State.PENDING_SHUTDOWN);
-                throw new TaskCorruptedException(corruptedTasksWithChangelogs);
+                throw new TaskCorruptedException(corruptedTasks);
             }
         }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
 
@@ -2326,16 +2324,14 @@ public class StreamThreadTest {
         final TaskId taskId1 = new TaskId(0, 0);
         final TaskId taskId2 = new TaskId(0, 2);
 
-        final Map<TaskId, Collection<TopicPartition>> 
corruptedTasksWithChangelogs = mkMap(
-            mkEntry(taskId1, emptySet())
-        );
+        final Set<TaskId> corruptedTasks = singleton(taskId1);
 
         expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes();
         expect(task1.id()).andReturn(taskId1).anyTimes();
         expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes();
         expect(task2.id()).andReturn(taskId2).anyTimes();
 
-        taskManager.handleCorruption(corruptedTasksWithChangelogs);
+        taskManager.handleCorruption(corruptedTasks);
         expectLastCall().andThrow(new TaskMigratedException("Task migrated",
                                                             new 
RuntimeException("non-corrupted task migrated")));
 
@@ -2368,7 +2364,7 @@ public class StreamThreadTest {
             @Override
             void runOnce() {
                 setState(State.PENDING_SHUTDOWN);
-                throw new TaskCorruptedException(corruptedTasksWithChangelogs);
+                throw new TaskCorruptedException(corruptedTasks);
             }
         }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 6e6a848..501dbe0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
@@ -653,7 +654,8 @@ public class TaskManagerTest {
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), 
is(true));
         assertThat(task00.state(), is(Task.State.RUNNING));
 
-        taskManager.handleCorruption(singletonMap(taskId00, 
taskId00Partitions));
+        task00.setChangelogOffsets(singletonMap(t1p0, 0L));
+        taskManager.handleCorruption(singleton(taskId00));
 
         assertThat(task00.commitPrepared, is(true));
         assertThat(task00.state(), is(Task.State.CREATED));
@@ -697,7 +699,8 @@ public class TaskManagerTest {
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), 
is(true));
         assertThat(task00.state(), is(Task.State.RUNNING));
 
-        taskManager.handleCorruption(singletonMap(taskId00, 
taskId00Partitions));
+        task00.setChangelogOffsets(singletonMap(t1p0, 0L));
+        taskManager.handleCorruption(singleton(taskId00));
         assertThat(task00.commitPrepared, is(true));
         assertThat(task00.state(), is(Task.State.CREATED));
         assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, 
task00)));
@@ -742,7 +745,8 @@ public class TaskManagerTest {
         assertThat(nonCorruptedTask.state(), is(Task.State.RUNNING));
         nonCorruptedTask.setCommitNeeded();
 
-        taskManager.handleCorruption(singletonMap(taskId00, 
taskId00Partitions));
+        corruptedTask.setChangelogOffsets(singletonMap(t1p0, 0L));
+        taskManager.handleCorruption(singleton(taskId00));
 
         assertTrue(nonCorruptedTask.commitPrepared);
         verify(consumer);
@@ -781,7 +785,8 @@ public class TaskManagerTest {
         taskManager.handleAssignment(assignment, emptyMap());
         assertThat(nonRunningNonCorruptedTask.state(), is(Task.State.CREATED));
 
-        taskManager.handleCorruption(singletonMap(taskId00, 
taskId00Partitions));
+        corruptedTask.setChangelogOffsets(singletonMap(t1p0, 0L));
+        taskManager.handleCorruption(singleton(taskId00));
 
         verify(activeTaskCreator);
         assertFalse(nonRunningNonCorruptedTask.commitPrepared);
@@ -821,7 +826,8 @@ public class TaskManagerTest {
 
         runningNonCorruptedActive.setCommitNeeded();
 
-        assertThrows(TaskMigratedException.class, () -> 
taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions)));
+        corruptedStandby.setChangelogOffsets(singletonMap(t1p0, 0L));
+        assertThrows(TaskMigratedException.class, () -> 
taskManager.handleCorruption(singleton(taskId00)));
 
 
         assertThat(corruptedStandby.commitPrepared, is(true));
@@ -2717,19 +2723,18 @@ public class TaskManagerTest {
         task00.setCommitNeeded();
         task01.setCommitNeeded();
 
-        assertThat(taskManager.commit(mkSet(task00, task01, task02)), 
equalTo(1));
-        assertThat(task00.timeout, equalTo(time.milliseconds()));
-        assertNull(task01.timeout);
-        assertNull(task02.timeout);
-
-        assertThat(taskManager.commit(mkSet(task00, task01, task02)), 
equalTo(1));
-        assertNull(task00.timeout);
-        assertNull(task01.timeout);
-        assertNull(task02.timeout);
+        final TaskCorruptedException exception = assertThrows(
+            TaskCorruptedException.class,
+            () -> taskManager.commit(mkSet(task00, task01, task02))
+        );
+        assertThat(
+            exception.corruptedTasks(),
+            equalTo(Collections.singleton(taskId00))
+        );
     }
 
     @Test
-    public void shouldNotFailForTimeoutExceptionOnCommitWithEosBeta() {
+    public void 
shouldThrowTaskCorruptedExceptionForTimeoutExceptionOnCommitWithEosBeta() {
         setUpTaskManager(ProcessingMode.EXACTLY_ONCE_BETA);
 
         final StreamsProducer producer = mock(StreamsProducer.class);
@@ -2759,15 +2764,14 @@ public class TaskManagerTest {
         task00.setCommitNeeded();
         task01.setCommitNeeded();
 
-        assertThat(taskManager.commit(mkSet(task00, task01, task02)), 
equalTo(0));
-        assertThat(task00.timeout, equalTo(time.milliseconds()));
-        assertThat(task01.timeout, equalTo(time.milliseconds()));
-        assertNull(task02.timeout);
-
-        assertThat(taskManager.commit(mkSet(task00, task01, task02)), 
equalTo(2));
-        assertNull(task00.timeout);
-        assertNull(task01.timeout);
-        assertNull(task02.timeout);
+        final TaskCorruptedException exception = assertThrows(
+            TaskCorruptedException.class,
+            () -> taskManager.commit(mkSet(task00, task01, task02))
+        );
+        assertThat(
+            exception.corruptedTasks(),
+            equalTo(mkSet(taskId00, taskId01))
+        );
     }
 
     @Test

Reply via email to