This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 0f2f7e6 MINOR: improve logging of tasks on shutdown (#7597)
0f2f7e6 is described below
commit 0f2f7e689429cbc2c4474149386d2a2d37264716
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Oct 29 10:15:39 2019 -0700
MINOR: improve logging of tasks on shutdown (#7597)
Reviewers: Guozhang Wang <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../streams/processor/internals/AbstractTask.java | 5 +-
.../processor/internals/AssignedStandbyTasks.java | 10 ++++
.../processor/internals/AssignedStreamsTasks.java | 62 +++++++++++++---------
.../streams/processor/internals/AssignedTasks.java | 2 +-
.../processor/internals/StoreChangelogReader.java | 2 +
.../streams/processor/internals/TaskManager.java | 7 +--
.../internals/AssignedStreamsTasksTest.java | 2 +-
.../processor/internals/StreamTaskTest.java | 11 ++--
.../processor/internals/StreamThreadTest.java | 18 ++++---
.../processor/internals/TaskManagerTest.java | 4 +-
10 files changed, 75 insertions(+), 48 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index d8494fa..ede47ba 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -72,7 +72,8 @@ public abstract class AbstractTask implements Task {
this.eosEnabled =
StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
this.stateDirectory = stateDirectory;
- this.logPrefix = String.format("%s [%s] ", isStandby ? "standby-task"
: "task", id);
+ final String threadIdPrefix = String.format("stream-thread [%s] ",
Thread.currentThread().getName());
+ this.logPrefix = threadIdPrefix + String.format("%s [%s] ", isStandby
? "standby-task" : "task", id);
this.logContext = new LogContext(logPrefix);
this.log = logContext.logger(getClass());
@@ -197,7 +198,7 @@ public abstract class AbstractTask implements Task {
log.trace("Initializing state stores");
for (final StateStore store : topology.stateStores()) {
- log.trace("Initializing store {}", store.name());
+ log.debug("Initializing store {}", store.name());
processorContext.uninitialize();
store.init(processorContext, store);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
index 9783970..0c9a70d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
@@ -31,6 +31,16 @@ class AssignedStandbyTasks extends
AssignedTasks<StandbyTask> {
}
@Override
+ public void shutdown(final boolean clean) {
+ final String shutdownType = clean ? "Clean" : "Unclean";
+ log.debug(shutdownType + " shutdown of all standby tasks" + "\n" +
+ "created tasks to close: {}" + "\n" +
+ "running tasks to close: {}",
+ clean, created.keySet(), running.keySet());
+ super.shutdown(clean);
+ }
+
+ @Override
int commit() {
final int committed = super.commit();
// TODO: this contortion would not be necessary if we got rid of the
two-step
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index cba17d0..3515824 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -111,7 +111,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
} else if (restoring.containsKey(task)) {
revokedRestoringTasks.add(task);
} else if (!suspended.containsKey(task)) {
- log.warn("Task {} was revoked but cannot be found in the
assignment, may have been closed due to error", task);
+ log.warn("Stream task {} was revoked but cannot be found in
the assignment, may have been closed due to error", task);
}
}
@@ -126,7 +126,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
final List<TopicPartition>
taskChangelogs) {
final AtomicReference<RuntimeException> firstException = new
AtomicReference<>(null);
- log.debug("Suspending running {} {}", taskTypeName, running.keySet());
+ log.debug("Suspending the running stream tasks {}", running.keySet());
for (final TaskId id : runningTasksToSuspend) {
final StreamTask task = running.get(id);
@@ -136,20 +136,20 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
suspended.put(id, task);
} catch (final TaskMigratedException closeAsZombieAndSwallow) {
// swallow and move on since we are rebalancing
- log.info("Failed to suspend {} {} since it got migrated to
another thread already. " +
- "Closing it as zombie and move on.", taskTypeName, id);
+ log.info("Failed to suspend the stream task {} since it got
migrated to another thread already. " +
+ "Closing it as zombie and moving on.", id);
firstException.compareAndSet(null, closeZombieTask(task));
prevActiveTasks.remove(id);
} catch (final RuntimeException e) {
- log.error("Suspending {} {} failed due to the following
error:", taskTypeName, id, e);
+ log.error("Suspending the stream task {} failed due to the
following error:", id, e);
firstException.compareAndSet(null, e);
try {
prevActiveTasks.remove(id);
task.close(false, false);
} catch (final RuntimeException f) {
log.error(
- "After suspending failed, closing the same {} {}
failed again due to the following error:",
- taskTypeName, id, f);
+ "After suspending failed, closing the same stream task
{} failed again due to the following error:",
+ id, f);
}
} finally {
running.remove(id);
@@ -159,14 +159,14 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
}
}
- log.trace("Successfully suspended the running {} {}", taskTypeName,
suspended.keySet());
+ log.trace("Successfully suspended the running stream task {}",
suspended.keySet());
return firstException.get();
}
private RuntimeException closeNonRunningTasks(final Set<TaskId>
nonRunningTasksToClose,
final List<TopicPartition>
closedTaskChangelogs) {
- log.debug("Closing the created but not initialized {} {}",
taskTypeName, nonRunningTasksToClose);
+ log.debug("Closing the created but not initialized stream tasks {}",
nonRunningTasksToClose);
final AtomicReference<RuntimeException> firstException = new
AtomicReference<>();
for (final TaskId id : nonRunningTasksToClose) {
@@ -202,7 +202,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
final boolean clean = !isZombie;
task.close(clean, isZombie);
} catch (final RuntimeException e) {
- log.error("Failed to close {}, {}", taskTypeName, task.id(), e);
+ log.error("Failed to close the stream task {}", task.id(), e);
return e;
}
@@ -218,7 +218,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
try {
task.close(false, isZombie);
} catch (final RuntimeException e) {
- log.error("Failed to close {}, {}", taskTypeName, task.id(), e);
+ log.error("Failed to close the stream task {}", task.id(), e);
return e;
}
@@ -239,7 +239,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
final boolean clean = !isZombie;
task.closeStateManager(clean);
} catch (final RuntimeException e) {
- log.error("Failed to close restoring task {} due to the following
error:", task.id(), e);
+ log.error("Failed to close the restoring stream task {} due to the
following error:", task.id(), e);
return e;
}
@@ -254,7 +254,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
final boolean clean = !isZombie;
task.closeSuspended(clean, null);
} catch (final RuntimeException e) {
- log.error("Failed to close suspended {} {} due to the following
error:", taskTypeName, task.id(), e);
+ log.error("Failed to close the suspended stream task {} due to the
following error:", task.id(), e);
return e;
}
@@ -262,7 +262,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
}
RuntimeException closeNotAssignedSuspendedTasks(final Set<TaskId>
revokedTasks) {
- log.debug("Closing the revoked active tasks {} {}", taskTypeName,
revokedTasks);
+ log.debug("Closing the revoked active stream tasks {}", revokedTasks);
final AtomicReference<RuntimeException> firstException = new
AtomicReference<>(null);
for (final TaskId revokedTask : revokedTasks) {
@@ -271,7 +271,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
if (suspendedTask != null) {
firstException.compareAndSet(null, closeSuspended(false,
suspendedTask));
} else {
- log.debug("Revoked task {} could not be found in suspended,
may have already been closed", revokedTask);
+ log.debug("Revoked stream task {} could not be found in
suspended, may have already been closed", revokedTask);
}
}
return firstException.get();
@@ -301,7 +301,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
// With the current rebalance protocol, there should not be any
running tasks left as they were all lost
if (!prevActiveTasks.isEmpty()) {
- log.error("Found still running {} after closing all tasks lost as
zombies", taskTypeName);
+ log.error("Found the still running stream tasks {} after closing
all tasks lost as zombies", prevActiveTasks);
firstException.compareAndSet(null, new IllegalStateException("Not
all lost tasks were closed as zombies"));
}
return firstException.get();
@@ -314,7 +314,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
final Set<TopicPartition> partitions) {
if (suspended.containsKey(taskId)) {
final StreamTask task = suspended.get(taskId);
- log.trace("Found suspended {} {}", taskTypeName, taskId);
+ log.trace("Found suspended stream task {}", taskId);
suspended.remove(taskId);
if (task.partitions().equals(partitions)) {
@@ -324,8 +324,8 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
} catch (final TaskMigratedException e) {
// we need to catch migration exception internally since
this function
// is triggered in the rebalance callback
- log.info("Failed to resume {} {} since it got migrated to
another thread already. " +
- "Closing it as zombie before triggering a new
rebalance.", taskTypeName, task.id());
+ log.info("Failed to resume the stream task {} since it got
migrated to another thread already. " +
+ "Closing it as zombie before triggering a new
rebalance.", task.id());
final RuntimeException fatalException =
closeZombieTask(task);
running.remove(taskId);
@@ -334,10 +334,10 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
}
throw e;
}
- log.trace("Resuming suspended {} {}", taskTypeName, task.id());
+ log.trace("Resuming the suspended stream task {}", task.id());
return true;
} else {
- log.warn("Couldn't resume task {} assigned partitions {}, task
partitions {}", taskId, partitions, task.partitions());
+ log.warn("Couldn't resume stream task {} assigned partitions
{}, task partitions {}", taskId, partitions, task.partitions());
task.closeSuspended(true, null);
}
}
@@ -398,10 +398,10 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
if (task.commitRequested() && task.commitNeeded()) {
task.commit();
committed++;
- log.debug("Committed active task {} per user request in",
task.id());
+ log.debug("Committed stream task {} per user request in",
task.id());
}
} catch (final TaskMigratedException e) {
- log.info("Failed to commit {} since it got migrated to another
thread already. " +
+ log.info("Failed to commit stream task {} since it got
migrated to another thread already. " +
"Closing it as zombie before triggering a new
rebalance.", task.id());
final RuntimeException fatalException = closeZombieTask(task);
if (fatalException != null) {
@@ -410,9 +410,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
it.remove();
throw e;
} catch (final RuntimeException t) {
- log.error("Failed to commit StreamTask {} due to the following
error:",
- task.id(),
- t);
+ log.error("Failed to commit stream task {} due to the
following error:", task.id(), t);
if (firstException == null) {
firstException = t;
}
@@ -510,6 +508,18 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
suspended.clear();
}
+ @Override
+ public void shutdown(final boolean clean) {
+ final String shutdownType = clean ? "Clean" : "Unclean";
+ log.debug(shutdownType + " shutdown of all active tasks" + "\n" +
+ "non-initialized tasks to close: {}" + "\n" +
+ "restoring tasks to close: {}" + "\n" +
+ "running tasks to close: {}" + "\n" +
+ "suspended tasks to close: {}",
+ clean, created.keySet(), restoring.keySet(), running.keySet(),
suspended.keySet());
+ super.shutdown(clean);
+ }
+
public String toString(final String indent) {
final StringBuilder builder = new StringBuilder();
builder.append(super.toString(indent));
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 8c4c68f..56fefe0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -218,7 +218,7 @@ abstract class AssignedTasks<T extends Task> {
return committed;
}
- void close(final boolean clean) {
+ void shutdown(final boolean clean) {
final AtomicReference<RuntimeException> firstException = new
AtomicReference<>(null);
for (final T task: allTasks()) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 55a33c0..81f76a3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -204,6 +204,8 @@ public class StoreChangelogReader implements
ChangelogReader {
restorer.checkpoint(),
restoreToOffsets.get(partition));
restorer.setStartingOffset(restoreConsumer.position(partition));
+
+ log.debug("Calling restorer for partition {} of task {}",
partition, active.restoringTaskFor(partition));
restorer.restoreStarted();
} else {
log.trace("Did not find checkpoint from changelog {} for store
{}, rewinding to beginning.", partition, restorer.storeName());
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index a2dac40..a9ccbf5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -279,15 +279,12 @@ public class TaskManager {
void shutdown(final boolean clean) {
final AtomicReference<RuntimeException> firstException = new
AtomicReference<>(null);
- log.debug("Shutting down all active tasks {}, standby tasks {}, and
suspended tasks {}", active.runningTaskIds(), standby.runningTaskIds(),
- active.suspendedTaskIds());
-
try {
- active.close(clean);
+ active.shutdown(clean);
} catch (final RuntimeException fatalException) {
firstException.compareAndSet(null, fatalException);
}
- standby.close(clean);
+ standby.shutdown(clean);
// remove the changelog partitions from restore consumer
try {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index fd9f0cc..a8f96e4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -546,7 +546,7 @@ public class AssignedStreamsTasksTest {
assignedTasks.initializeNewTasks();
assertNull(assignedTasks.suspendOrCloseTasks(assignedTasks.allAssignedTaskIds(),
revokedChangelogs));
- assignedTasks.close(true);
+ assignedTasks.shutdown(true);
}
private void addAndInitTask() {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index cf33fc4..59cc2a5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -267,7 +267,9 @@ public class StreamTaskTest {
assertTimeoutErrorLog(appender);
// make sure we report the correct message
- assertThat(expected.getMessage(), is("task [0_0] Failed to
initialize task 0_0 due to timeout."));
+ assertThat(
+ expected.getMessage(),
+ is("stream-thread [" + Thread.currentThread().getName() + "]
task [0_0] Failed to initialize task 0_0 due to timeout."));
// make sure we preserve the cause
assertEquals(expected.getCause().getClass(),
TimeoutException.class);
@@ -326,7 +328,9 @@ public class StreamTaskTest {
assertTimeoutErrorLog(appender);
// make sure we report the correct message
- assertThat(expected.getMessage(), is("task [0_0] Failed to
initialize task 0_0 due to timeout."));
+ assertThat(
+ expected.getMessage(),
+ is("stream-thread [" + Thread.currentThread().getName() + "]
task [0_0] Failed to initialize task 0_0 due to timeout."));
// make sure we preserve the cause
assertEquals(expected.getCause().getClass(),
TimeoutException.class);
@@ -338,7 +342,7 @@ public class StreamTaskTest {
private void assertTimeoutErrorLog(final LogCaptureAppender appender) {
final String expectedErrorLogMessage =
- "task [0_0] Timeout exception caught when initializing
transactions for task 0_0. " +
+ "stream-thread [" + Thread.currentThread().getName() + "] task
[0_0] Timeout exception caught when initializing transactions for task 0_0. " +
"This might happen if the broker is slow to respond, if the
network " +
"connection to the broker was interrupted, or if similar
circumstances arise. " +
"You can increase producer parameter `max.block.ms` to
increase this timeout.";
@@ -1758,4 +1762,5 @@ public class StreamTaskTest {
recordValue
);
}
+
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 73cf768..5b56eb8 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -1550,8 +1550,8 @@ public class StreamThreadTest {
LogCaptureAppender.unregister(appender);
final List<String> strings = appender.getMessages();
- assertTrue(strings.contains("task [0_1] Skipping record due to
deserialization error. topic=[topic1] partition=[1] offset=[0]"));
- assertTrue(strings.contains("task [0_1] Skipping record due to
deserialization error. topic=[topic1] partition=[1] offset=[1]"));
+ assertTrue(strings.contains("stream-thread [" +
Thread.currentThread().getName() + "] task [0_1] Skipping record due to
deserialization error. topic=[topic1] partition=[1] offset=[0]"));
+ assertTrue(strings.contains("stream-thread [" +
Thread.currentThread().getName() + "] task [0_1] Skipping record due to
deserialization error. topic=[topic1] partition=[1] offset=[1]"));
}
@Test
@@ -1618,33 +1618,35 @@ public class StreamThreadTest {
LogCaptureAppender.unregister(appender);
final List<String> strings = appender.getMessages();
+
+ final String threadTaskPrefix = "stream-thread [" +
Thread.currentThread().getName() + "] task [0_1] ";
assertTrue(strings.contains(
- "task [0_1] Skipping record due to negative extracted timestamp. "
+
+ threadTaskPrefix + "Skipping record due to negative extracted
timestamp. " +
"topic=[topic1] partition=[1] offset=[0]
extractedTimestamp=[-1] " +
"extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
));
assertTrue(strings.contains(
- "task [0_1] Skipping record due to negative extracted timestamp. "
+
+ threadTaskPrefix + "Skipping record due to negative extracted
timestamp. " +
"topic=[topic1] partition=[1] offset=[1]
extractedTimestamp=[-1] " +
"extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
));
assertTrue(strings.contains(
- "task [0_1] Skipping record due to negative extracted timestamp. "
+
+ threadTaskPrefix + "Skipping record due to negative extracted
timestamp. " +
"topic=[topic1] partition=[1] offset=[2]
extractedTimestamp=[-1] " +
"extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
));
assertTrue(strings.contains(
- "task [0_1] Skipping record due to negative extracted timestamp. "
+
+ threadTaskPrefix + "Skipping record due to negative extracted
timestamp. " +
"topic=[topic1] partition=[1] offset=[3]
extractedTimestamp=[-1] " +
"extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
));
assertTrue(strings.contains(
- "task [0_1] Skipping record due to negative extracted timestamp. "
+
+ threadTaskPrefix + "Skipping record due to negative extracted
timestamp. " +
"topic=[topic1] partition=[1] offset=[4]
extractedTimestamp=[-1] " +
"extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
));
assertTrue(strings.contains(
- "task [0_1] Skipping record due to negative extracted timestamp. "
+
+ threadTaskPrefix + "Skipping record due to negative extracted
timestamp. " +
"topic=[topic1] partition=[1] offset=[5]
extractedTimestamp=[-1] " +
"extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index e46a4cc..8e4b2c2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -375,7 +375,7 @@ public class TaskManagerTest {
@Test
public void shouldCloseActiveTasksOnShutdown() {
- active.close(true);
+ active.shutdown(true);
expectLastCall();
replay();
@@ -385,7 +385,7 @@ public class TaskManagerTest {
@Test
public void shouldCloseStandbyTasksOnShutdown() {
- standby.close(false);
+ standby.shutdown(false);
expectLastCall();
replay();