This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 2fad1652942 KAFKA-10199: Add task updater metrics, part 1 (#13228) 2fad1652942 is described below commit 2fad1652942226454a44038f2350642817f9f74b Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Fri Feb 24 10:25:11 2023 -0800 KAFKA-10199: Add task updater metrics, part 1 (#13228) * Moved pausing-tasks logic out of the commit-interval loop to be on the top-level loop, similar to resuming tasks. * Added thread-level restoration metrics. * Related unit tests. Reviewers: Lucas Brutschy <lucas...@users.noreply.github.com>, Matthias J. Sax <matth...@confluent.io> --- .../processor/internals/ChangelogReader.java | 10 +- .../processor/internals/DefaultStateUpdater.java | 230 +++++++++++++++++++-- .../processor/internals/StoreChangelogReader.java | 94 +++++---- .../streams/processor/internals/StreamThread.java | 5 +- .../streams/processor/internals/TaskManager.java | 4 +- .../internals/metrics/StreamsMetricsImpl.java | 5 + .../internals/DefaultStateUpdaterTest.java | 151 ++++++++++++-- .../processor/internals/MockChangelogReader.java | 8 +- .../processor/internals/StreamThreadTest.java | 2 +- 9 files changed, 427 insertions(+), 82 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java index 03199d294ca..1cf8ef628da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java @@ -28,8 +28,10 @@ import java.util.Set; public interface ChangelogReader extends ChangelogRegister { /** * Restore all registered state stores by reading from their changelogs + * + * @return the total number of records restored in this call */ - void restore(final Map<TaskId, Task> tasks); + long restore(final Map<TaskId, Task> tasks); /** * Transit to restore active changelogs mode @@ -41,6 +43,12 @@ public interface ChangelogReader extends ChangelogRegister { */ void transitToUpdateStandby(); + /** + * @return true if the reader is in restoring active changelog mode; + * false if the reader is in updating standby changelog mode + */ + boolean isRestoringActive(); + /** * @return the changelog partitions that have been completed restoring */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index ae6618c304f..5e912c99a5b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -16,8 +16,15 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.Sensor.RecordingLevel; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.WindowedCount; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; @@ -32,7 +39,9 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Deque; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -51,6 +60,10 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_DESCRIPTION; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_DESCRIPTION; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG; + public class DefaultStateUpdater implements StateUpdater { private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " + @@ -59,14 +72,20 @@ public class DefaultStateUpdater implements StateUpdater { private class StateUpdaterThread extends Thread { private final ChangelogReader changelogReader; + private final StateUpdaterMetrics updaterMetrics; private final AtomicBoolean isRunning = new AtomicBoolean(true); private final Map<TaskId, Task> updatingTasks = new ConcurrentHashMap<>(); private final Map<TaskId, Task> pausedTasks = new ConcurrentHashMap<>(); private final Logger log; - public StateUpdaterThread(final String name, final ChangelogReader changelogReader) { + private long totalCheckpointLatency = 0L; + + public StateUpdaterThread(final String name, + final Metrics metrics, + final ChangelogReader changelogReader) { super(name); this.changelogReader = changelogReader; + this.updaterMetrics = new StateUpdaterMetrics(metrics, name); final String logPrefix = String.format("state-updater [%s] ", name); final LogContext logContext = new LogContext(logPrefix); @@ -92,6 +111,30 @@ public class DefaultStateUpdater implements StateUpdater { return pausedTasks.values(); } + public long getNumUpdatingStandbyTasks() { + return updatingTasks.values().stream() + .filter(t -> !t.isActive()) + .count(); + } + + public long getNumRestoringActiveTasks() { + return updatingTasks.values().stream() + .filter(Task::isActive) + .count(); + } + + public long getNumPausedStandbyTasks() { + return pausedTasks.values().stream() + .filter(t -> !t.isActive()) + .count(); + } + + public long getNumPausedActiveTasks() { + return pausedTasks.values().stream() + .filter(Task::isActive) + .count(); + } + @Override public void run() { log.info("State updater thread started"); @@ -109,17 +152,41 @@ public class DefaultStateUpdater implements StateUpdater { Thread.interrupted(); // Clear the interrupted flag. removeAddedTasksFromInputQueue(); removeUpdatingAndPausedTasks(); + updaterMetrics.clear(); shutdownGate.countDown(); log.info("State updater thread shutdown"); } } + // In each iteration: + // 1) check if updating tasks need to be paused + // 2) check if paused tasks need to be resumed + // 3) restore those updating tasks + // 4) checkpoint those updating task states + // 5) idle waiting if there is no more tasks to be restored + // + // Note that, 1-3) are measured as restoring time, while 4) and 5) measured separately + // as checkpointing time and idle time private void runOnce() throws InterruptedException { + final long totalStartTimeMs = time.milliseconds(); performActionsOnTasks(); + resumeTasks(); - restoreTasks(); - checkAllUpdatingTaskStates(time.milliseconds()); + pauseTasks(); + restoreTasks(totalStartTimeMs); + + final long checkpointStartTimeMs = time.milliseconds(); + maybeCheckpointTasks(checkpointStartTimeMs); + + final long waitStartTimeMs = time.milliseconds(); + waitIfAllChangelogsCompletelyRead(); + + final long endTimeMs = time.milliseconds(); + final long totalWaitTime = Math.max(0L, endTimeMs - waitStartTimeMs); + final long totalTime = Math.max(0L, endTimeMs - totalStartTimeMs); + + recordMetrics(endTimeMs, totalTime, totalWaitTime); } private void performActionsOnTasks() { @@ -151,9 +218,18 @@ public class DefaultStateUpdater implements StateUpdater { } } - private void restoreTasks() { + private void pauseTasks() { + for (final Task task : updatingTasks.values()) { + if (topologyMetadata.isPaused(task.id().topologyName())) { + pauseTask(task); + } + } + } + + private void restoreTasks(final long now) { try { - changelogReader.restore(updatingTasks); + final long restored = changelogReader.restore(updatingTasks); + updaterMetrics.restoreSensor.record(restored, now); } catch (final TaskCorruptedException taskCorruptedException) { handleTaskCorruptedException(taskCorruptedException); } catch (final StreamsException streamsException) { @@ -193,7 +269,7 @@ public class DefaultStateUpdater implements StateUpdater { task.markChangelogAsCorrupted(task.changelogPartitions()); // we need to enforce a checkpoint that removes the corrupted partitions - task.maybeCheckpoint(true); + measureCheckpointLatency(() -> task.maybeCheckpoint(true)); } private void handleStreamsException(final StreamsException streamsException) { @@ -251,10 +327,10 @@ public class DefaultStateUpdater implements StateUpdater { private void removeUpdatingAndPausedTasks() { changelogReader.clear(); - updatingTasks.forEach((id, task) -> { + measureCheckpointLatency(() -> updatingTasks.forEach((id, task) -> { task.maybeCheckpoint(true); removedTasks.add(task); - }); + })); updatingTasks.clear(); pausedTasks.forEach((id, task) -> { removedTasks.add(task); @@ -313,7 +389,7 @@ public class DefaultStateUpdater implements StateUpdater { final Task task; if (updatingTasks.containsKey(taskId)) { task = updatingTasks.get(taskId); - task.maybeCheckpoint(true); + measureCheckpointLatency(() -> task.maybeCheckpoint(true)); final Collection<TopicPartition> changelogPartitions = task.changelogPartitions(); changelogReader.unregister(changelogPartitions); removedTasks.add(task); @@ -339,7 +415,7 @@ public class DefaultStateUpdater implements StateUpdater { private void pauseTask(final Task task) { final TaskId taskId = task.id(); // do not need to unregister changelog partitions for paused tasks - task.maybeCheckpoint(true); + measureCheckpointLatency(() -> task.maybeCheckpoint(true)); pausedTasks.put(taskId, task); updatingTasks.remove(taskId); if (task.isActive()) { @@ -373,7 +449,7 @@ public class DefaultStateUpdater implements StateUpdater { final Set<TopicPartition> restoredChangelogs) { final Collection<TopicPartition> changelogPartitions = task.changelogPartitions(); if (restoredChangelogs.containsAll(changelogPartitions)) { - task.maybeCheckpoint(true); + measureCheckpointLatency(() -> task.maybeCheckpoint(true)); changelogReader.unregister(changelogPartitions); addToRestoredTasks(task); updatingTasks.remove(task.id()); @@ -399,31 +475,55 @@ public class DefaultStateUpdater implements StateUpdater { } } - private void checkAllUpdatingTaskStates(final long now) { + private void maybeCheckpointTasks(final long now) { final long elapsedMsSinceLastCommit = now - lastCommitMs; if (elapsedMsSinceLastCommit > commitIntervalMs) { if (log.isDebugEnabled()) { - log.debug("Checking all restoring task states since {}ms has elapsed (commit interval is {}ms)", + log.debug("Checkpointing state of all restoring tasks since {}ms has elapsed (commit interval is {}ms)", elapsedMsSinceLastCommit, commitIntervalMs); } - for (final Task task : updatingTasks.values()) { - if (topologyMetadata.isPaused(task.id().topologyName())) { - pauseTask(task); - } else { - log.debug("Try to checkpoint current restoring progress for task {}", task.id()); + measureCheckpointLatency(() -> { + for (final Task task : updatingTasks.values()) { // do not enforce checkpointing during restoration if its position has not advanced much task.maybeCheckpoint(false); } - } + }); lastCommitMs = now; } } + + private void measureCheckpointLatency(final Runnable actionToMeasure) { + final long startMs = time.milliseconds(); + try { + actionToMeasure.run(); + } finally { + totalCheckpointLatency += Math.max(0L, time.milliseconds() - startMs); + } + } + + private void recordMetrics(final long now, final long totalLatency, final long totalWaitLatency) { + final long totalRestoreLatency = Math.max(0L, totalLatency - totalWaitLatency - totalCheckpointLatency); + + updaterMetrics.idleRatioSensor.record((double) totalWaitLatency / totalLatency, now); + updaterMetrics.checkpointRatioSensor.record((double) totalCheckpointLatency / totalLatency, now); + + if (changelogReader.isRestoringActive()) { + updaterMetrics.activeRestoreRatioSensor.record((double) totalRestoreLatency / totalLatency, now); + updaterMetrics.standbyRestoreRatioSensor.record(0.0d, now); + } else { + updaterMetrics.standbyRestoreRatioSensor.record((double) totalRestoreLatency / totalLatency, now); + updaterMetrics.activeRestoreRatioSensor.record(0.0d, now); + } + + totalCheckpointLatency = 0L; + } } private final Time time; private final String name; + private final Metrics metrics; private final ChangelogReader changelogReader; private final TopologyMetadata topologyMetadata; private final Queue<TaskAndAction> tasksAndActions = new LinkedList<>(); @@ -443,12 +543,14 @@ public class DefaultStateUpdater implements StateUpdater { private CountDownLatch shutdownGate; public DefaultStateUpdater(final String name, + final Metrics metrics, final StreamsConfig config, final ChangelogReader changelogReader, final TopologyMetadata topologyMetadata, final Time time) { this.time = time; this.name = name; + this.metrics = metrics; this.changelogReader = changelogReader; this.topologyMetadata = topologyMetadata; this.commitIntervalMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); @@ -456,7 +558,7 @@ public class DefaultStateUpdater implements StateUpdater { public void start() { if (stateUpdaterThread == null) { - stateUpdaterThread = new StateUpdaterThread(name, changelogReader); + stateUpdaterThread = new StateUpdaterThread(name, metrics, changelogReader); stateUpdaterThread.start(); shutdownGate = new CountDownLatch(1); @@ -686,4 +788,92 @@ public class DefaultStateUpdater implements StateUpdater { exceptionsAndFailedTasks.stream().flatMap(exceptionAndTasks -> exceptionAndTasks.getTasks().stream()), removedTasks.stream())))); } + + private class StateUpdaterMetrics { + private static final String STATE_LEVEL_GROUP = "stream-state-updater-metrics"; + + private static final String IDLE_RATIO_DESCRIPTION = RATIO_DESCRIPTION + "being idle"; + private static final String RESTORE_RATIO_DESCRIPTION = RATIO_DESCRIPTION + "restoring active tasks"; + private static final String UPDATE_RATIO_DESCRIPTION = RATIO_DESCRIPTION + "updating standby tasks"; + private static final String CHECKPOINT_RATIO_DESCRIPTION = RATIO_DESCRIPTION + "checkpointing tasks restored progress"; + private static final String RESTORE_RECORDS_RATE_DESCRIPTION = RATE_DESCRIPTION + "records restored"; + private static final String RESTORE_RATE_DESCRIPTION = RATE_DESCRIPTION + "restore calls triggered"; + + private final Sensor restoreSensor; + private final Sensor idleRatioSensor; + private final Sensor activeRestoreRatioSensor; + private final Sensor standbyRestoreRatioSensor; + private final Sensor checkpointRatioSensor; + + private final Deque<String> allSensorNames = new LinkedList<>(); + private final Deque<MetricName> allMetricNames = new LinkedList<>(); + + private StateUpdaterMetrics(final Metrics metrics, final String threadId) { + final Map<String, String> threadLevelTags = new LinkedHashMap<>(); + threadLevelTags.put(THREAD_ID_TAG, threadId); + + MetricName metricName = metrics.metricName("active-restoring-tasks", + STATE_LEVEL_GROUP, + "The number of active tasks currently undergoing restoration", + threadLevelTags); + metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? + stateUpdaterThread.getNumRestoringActiveTasks() : 0); + allMetricNames.push(metricName); + + metricName = metrics.metricName("standby-updating-tasks", + STATE_LEVEL_GROUP, + "The number of standby tasks currently undergoing state update", + threadLevelTags); + metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? + stateUpdaterThread.getNumUpdatingStandbyTasks() : 0); + allMetricNames.push(metricName); + + metricName = metrics.metricName("active-paused-tasks", + STATE_LEVEL_GROUP, + "The number of active tasks paused restoring", + threadLevelTags); + metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? + stateUpdaterThread.getNumPausedActiveTasks() : 0); + allMetricNames.push(metricName); + + metricName = metrics.metricName("standby-paused-tasks", + STATE_LEVEL_GROUP, + "The number of standby tasks paused state update", + threadLevelTags); + metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? + stateUpdaterThread.getNumPausedStandbyTasks() : 0); + allMetricNames.push(metricName); + + this.idleRatioSensor = metrics.sensor("idle-ratio", RecordingLevel.INFO); + this.idleRatioSensor.add(new MetricName("idle-ratio", STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); + allSensorNames.add("idle-ratio"); + + this.activeRestoreRatioSensor = metrics.sensor("active-restore-ratio", RecordingLevel.INFO); + this.activeRestoreRatioSensor.add(new MetricName("active-restore-ratio", STATE_LEVEL_GROUP, RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); + allSensorNames.add("active-restore-ratio"); + + this.standbyRestoreRatioSensor = metrics.sensor("standby-update-ratio", RecordingLevel.INFO); + this.standbyRestoreRatioSensor.add(new MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); + allSensorNames.add("standby-update-ratio"); + + this.checkpointRatioSensor = metrics.sensor("checkpoint-ratio", RecordingLevel.INFO); + this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio", STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Avg()); + allSensorNames.add("checkpoint-ratio"); + + this.restoreSensor = metrics.sensor("restore-records", RecordingLevel.INFO); + this.restoreSensor.add(new MetricName("restore-records-rate", STATE_LEVEL_GROUP, RESTORE_RECORDS_RATE_DESCRIPTION, threadLevelTags), new Rate()); + this.restoreSensor.add(new MetricName("restore-call-rate", STATE_LEVEL_GROUP, RESTORE_RATE_DESCRIPTION, threadLevelTags), new Rate(new WindowedCount())); + allSensorNames.add("restore-records"); + } + + void clear() { + while (!allSensorNames.isEmpty()) { + metrics.removeSensor(allSensorNames.pop()); + } + + while (!allMetricNames.isEmpty()) { + metrics.removeMetric(allMetricNames.pop()); + } + } + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index be580f3575c..701ecb9b567 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -330,6 +330,11 @@ public class StoreChangelogReader implements ChangelogReader { state = ChangelogReaderState.STANDBY_UPDATING; } + @Override + public boolean isRestoringActive() { + return state == ChangelogReaderState.ACTIVE_RESTORING; + } + /** * Since it is shared for multiple tasks and hence multiple state managers, the registration would take its * corresponding state manager as well for restoring. @@ -423,49 +428,22 @@ public class StoreChangelogReader implements ChangelogReader { // 2. if all changelogs have finished, return early; // 3. if there are any restoring changelogs, try to read from the restore consumer and process them. @Override - public void restore(final Map<TaskId, Task> tasks) { - - // If we are updating only standby tasks, and are not using a separate thread, we should - // use a non-blocking poll to unblock the processing as soon as possible. - final boolean useNonBlockingPoll = state == ChangelogReaderState.STANDBY_UPDATING && !stateUpdaterEnabled; - + public long restore(final Map<TaskId, Task> tasks) { initializeChangelogs(tasks, registeredChangelogs()); if (!activeRestoringChangelogs().isEmpty() && state == ChangelogReaderState.STANDBY_UPDATING) { throw new IllegalStateException("Should not be in standby updating state if there are still un-completed active changelogs"); } + long totalRestored = 0L; if (allChangelogsCompleted()) { log.debug("Finished restoring all changelogs {}", changelogs.keySet()); - return; + return totalRestored; } final Set<TopicPartition> restoringChangelogs = restoringChangelogs(); if (!restoringChangelogs.isEmpty()) { - final ConsumerRecords<byte[], byte[]> polledRecords; - - try { - pauseResumePartitions(tasks, restoringChangelogs); - - polledRecords = restoreConsumer.poll(useNonBlockingPoll ? Duration.ZERO : pollTime); - - // TODO (?) If we cannot fetch records during restore, should we trigger `task.timeout.ms` ? - // TODO (?) If we cannot fetch records for standby task, should we trigger `task.timeout.ms` ? - } catch (final InvalidOffsetException e) { - log.warn("Encountered " + e.getClass().getName() + - " fetching records from restore consumer for partitions " + e.partitions() + ", it is likely that " + - "the consumer's position has fallen out of the topic partition offset range because the topic was " + - "truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" + - " it later.", e); - - final Set<TaskId> corruptedTasks = new HashSet<>(); - e.partitions().forEach(partition -> corruptedTasks.add(changelogs.get(partition).stateManager.taskId())); - throw new TaskCorruptedException(corruptedTasks, e); - } catch (final InterruptException interruptException) { - throw interruptException; - } catch (final KafkaException e) { - throw new StreamsException("Restore consumer get unexpected error polling records.", e); - } + final ConsumerRecords<byte[], byte[]> polledRecords = pollRecordsFromRestoreConsumer(tasks, restoringChangelogs); for (final TopicPartition partition : polledRecords.partitions()) { bufferChangelogRecords(restoringChangelogByPartition(partition), polledRecords.records(partition)); @@ -479,12 +457,15 @@ public class StoreChangelogReader implements ChangelogReader { // small batches; this can be optimized in the future, e.g. wait longer for larger batches. final TaskId taskId = changelogs.get(partition).stateManager.taskId(); try { - if (restoreChangelog(changelogs.get(partition))) { + final ChangelogMetadata changelogMetadata = changelogs.get(partition); + final int restored = restoreChangelog(changelogMetadata); + if (restored > 0 || changelogMetadata.state().equals(ChangelogState.COMPLETED)) { final Task task = tasks.get(taskId); if (task != null) { task.clearTaskTimeout(); } } + totalRestored += restored; } catch (final TimeoutException timeoutException) { tasks.get(taskId).maybeInitTaskTimeoutOrThrow( time.milliseconds(), @@ -497,6 +478,41 @@ public class StoreChangelogReader implements ChangelogReader { maybeLogRestorationProgress(); } + + return totalRestored; + } + + private ConsumerRecords<byte[], byte[]> pollRecordsFromRestoreConsumer(final Map<TaskId, Task> tasks, + final Set<TopicPartition> restoringChangelogs) { + // If we are updating only standby tasks, and are not using a separate thread, we should + // use a non-blocking poll to unblock the processing as soon as possible. + final boolean useNonBlockingPoll = state == ChangelogReaderState.STANDBY_UPDATING && !stateUpdaterEnabled; + final ConsumerRecords<byte[], byte[]> polledRecords; + + try { + pauseResumePartitions(tasks, restoringChangelogs); + + polledRecords = restoreConsumer.poll(useNonBlockingPoll ? Duration.ZERO : pollTime); + + // TODO (?) If we cannot fetch records during restore, should we trigger `task.timeout.ms` ? + // TODO (?) If we cannot fetch records for standby task, should we trigger `task.timeout.ms` ? + } catch (final InvalidOffsetException e) { + log.warn("Encountered " + e.getClass().getName() + + " fetching records from restore consumer for partitions " + e.partitions() + ", it is likely that " + + "the consumer's position has fallen out of the topic partition offset range because the topic was " + + "truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing " + + "it later.", e); + + final Set<TaskId> corruptedTasks = new HashSet<>(); + e.partitions().forEach(partition -> corruptedTasks.add(changelogs.get(partition).stateManager.taskId())); + throw new TaskCorruptedException(corruptedTasks, e); + } catch (final InterruptException interruptException) { + throw interruptException; + } catch (final KafkaException e) { + throw new StreamsException("Restore consumer get unexpected error polling records.", e); + } + + return polledRecords; } private void pauseResumePartitions(final Map<TaskId, Task> tasks, @@ -623,19 +639,17 @@ public class StoreChangelogReader implements ChangelogReader { /** * restore a changelog with its buffered records if there's any; for active changelogs also check if * it has completed the restoration and can transit to COMPLETED state and trigger restore callbacks + * + * @return number of records restored */ - private boolean restoreChangelog(final ChangelogMetadata changelogMetadata) { + private int restoreChangelog(final ChangelogMetadata changelogMetadata) { final ProcessorStateManager stateManager = changelogMetadata.stateManager; final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata; final TopicPartition partition = storeMetadata.changelogPartition(); final String storeName = storeMetadata.store().name(); final int numRecords = changelogMetadata.bufferedLimitIndex; - boolean madeProgress = false; - if (numRecords != 0) { - madeProgress = true; - final List<ConsumerRecord<byte[], byte[]>> records = changelogMetadata.bufferedRecords.subList(0, numRecords); stateManager.restore(storeMetadata, records); @@ -650,7 +664,7 @@ public class StoreChangelogReader implements ChangelogReader { final Long currentOffset = storeMetadata.offset(); log.trace("Restored {} records from changelog {} to store {}, end offset is {}, current offset is {}", - partition, storeName, numRecords, recordEndOffset(changelogMetadata.restoreEndOffset), currentOffset); + numRecords, partition, storeName, recordEndOffset(changelogMetadata.restoreEndOffset), currentOffset); changelogMetadata.bufferedLimitIndex = 0; changelogMetadata.totalRestored += numRecords; @@ -667,8 +681,6 @@ public class StoreChangelogReader implements ChangelogReader { // we should check even if there's nothing restored, but do not check completed if we are processing standby tasks if (changelogMetadata.stateManager.taskType() == Task.TaskType.ACTIVE && hasRestoredToEnd(changelogMetadata)) { - madeProgress = true; - log.info("Finished restoring changelog {} to store {} with a total number of {} records", partition, storeName, changelogMetadata.totalRestored); @@ -682,7 +694,7 @@ public class StoreChangelogReader implements ChangelogReader { } } - return madeProgress; + return numRecords; } private Set<Task> getTasksFromPartitions(final Map<TaskId, Task> tasks, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 02bd74a027d..1f2a91d27b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -404,7 +404,7 @@ public class StreamThread extends Thread { topologyMetadata, adminClient, stateDirectory, - maybeCreateAndStartStateUpdater(stateUpdaterEnabled, config, changelogReader, topologyMetadata, time, clientId, threadIdx) + maybeCreateAndStartStateUpdater(stateUpdaterEnabled, streamsMetrics, config, changelogReader, topologyMetadata, time, clientId, threadIdx) ); referenceContainer.taskManager = taskManager; @@ -448,6 +448,7 @@ public class StreamThread extends Thread { } private static StateUpdater maybeCreateAndStartStateUpdater(final boolean stateUpdaterEnabled, + final StreamsMetricsImpl streamsMetrics, final StreamsConfig streamsConfig, final ChangelogReader changelogReader, final TopologyMetadata topologyMetadata, @@ -456,7 +457,7 @@ public class StreamThread extends Thread { final int threadIdx) { if (stateUpdaterEnabled) { final String name = clientId + "-StateUpdater-" + threadIdx; - final StateUpdater stateUpdater = new DefaultStateUpdater(name, streamsConfig, changelogReader, topologyMetadata, time); + final StateUpdater stateUpdater = new DefaultStateUpdater(name, streamsMetrics.metricsRegistry(), streamsConfig, changelogReader, topologyMetadata, time); stateUpdater.start(); return stateUpdater; } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index cf83cb27eba..c2f3b5253ac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -1552,7 +1552,9 @@ public class TaskManager { /** * Returns tasks owned by the stream thread. With state updater disabled, these are all tasks. With * state updater enabled, this does not return any tasks currently owned by the state updater. - * @return + * + * TODO: after we complete switching to state updater, we could rename this function as allRunningTasks + * to be differentiated from allTasks including running and restoring tasks */ Map<TaskId, Task> allOwnedTasks() { // not bothering with an unmodifiable map, since the tasks themselves are mutable, but diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index 3260bfc1b82..f3cd0982eaa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -146,6 +146,7 @@ public class StreamsMetricsImpl implements StreamsMetrics { public static final String OPERATIONS = " operations"; public static final String TOTAL_DESCRIPTION = "The total number of "; public static final String RATE_DESCRIPTION = "The average per-second number of "; + public static final String RATIO_DESCRIPTION = "The fraction of time the thread spent on "; public static final String AVG_LATENCY_DESCRIPTION = "The average latency of "; public static final String MAX_LATENCY_DESCRIPTION = "The maximum latency of "; public static final String RATE_DESCRIPTION_PREFIX = "The average number of "; @@ -177,6 +178,10 @@ public class StreamsMetricsImpl implements StreamsMetrics { return version; } + public Metrics metricsRegistry() { + return metrics; + } + public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() { return rocksDBMetricsRecordingTrigger; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index b0c0ba7c156..b3407114209 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -17,7 +17,9 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; @@ -26,6 +28,7 @@ import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.StateUpdater.ExceptionAndTasks; import org.apache.kafka.streams.processor.internals.Task.State; +import org.hamcrest.Matcher; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.mockito.InOrder; @@ -35,6 +38,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -51,6 +55,10 @@ import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statefulTask; import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask; import static org.apache.kafka.test.StreamsTestUtils.TopologyMetadataBuilder.unnamedTopology; import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -58,7 +66,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -76,6 +84,7 @@ class DefaultStateUpdaterTest { private final static TopicPartition TOPIC_PARTITION_A_0 = new TopicPartition("topicA", 0); private final static TopicPartition TOPIC_PARTITION_A_1 = new TopicPartition("topicA", 1); private final static TopicPartition TOPIC_PARTITION_B_0 = new TopicPartition("topicB", 0); + private final static TopicPartition TOPIC_PARTITION_B_1 = new TopicPartition("topicB", 1); private final static TopicPartition TOPIC_PARTITION_C_0 = new TopicPartition("topicC", 0); private final static TopicPartition TOPIC_PARTITION_D_0 = new TopicPartition("topicD", 0); private final static TaskId TASK_0_0 = new TaskId(0, 0); @@ -84,14 +93,18 @@ class DefaultStateUpdaterTest { private final static TaskId TASK_1_0 = new TaskId(1, 0); private final static TaskId TASK_1_1 = new TaskId(1, 1); private final static TaskId TASK_A_0_0 = new TaskId(0, 0, "A"); + private final static TaskId TASK_A_0_1 = new TaskId(0, 1, "A"); private final static TaskId TASK_B_0_0 = new TaskId(0, 0, "B"); + private final static TaskId TASK_B_0_1 = new TaskId(0, 1, "B"); // need an auto-tick timer to work for draining with timeout private final Time time = new MockTime(1L); + private final Metrics metrics = new Metrics(time); private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL)); private final ChangelogReader changelogReader = mock(ChangelogReader.class); private final TopologyMetadata topologyMetadata = unnamedTopology().build(); - private DefaultStateUpdater stateUpdater = new DefaultStateUpdater("test-state-updater", config, changelogReader, topologyMetadata, time); + private DefaultStateUpdater stateUpdater = + new DefaultStateUpdater("test-state-updater", metrics, config, changelogReader, topologyMetadata, time); @AfterEach public void tearDown() { @@ -149,7 +162,7 @@ class DefaultStateUpdaterTest { @Test public void shouldRemoveUpdatingTasksOnShutdown() throws Exception { stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE)); - stateUpdater = new DefaultStateUpdater("test-state-updater", new StreamsConfig(configProps(Integer.MAX_VALUE)), changelogReader, topologyMetadata, time); + stateUpdater = new DefaultStateUpdater("test-state-updater", metrics, new StreamsConfig(configProps(Integer.MAX_VALUE)), changelogReader, topologyMetadata, time); final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build(); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); @@ -643,7 +656,7 @@ class DefaultStateUpdaterTest { mkEntry(activeTask2.id(), activeTask2), mkEntry(standbyTask.id(), standbyTask) ); - doThrow(taskCorruptedException).doNothing().when(changelogReader).restore(updatingTasks1); + doThrow(taskCorruptedException).doReturn(0L).when(changelogReader).restore(updatingTasks1); when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); @@ -670,7 +683,7 @@ class DefaultStateUpdaterTest { final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(mkSet(task1.id())); final ExceptionAndTasks expectedExceptionAndTasks = new ExceptionAndTasks(mkSet(task1), taskCorruptedException); when(changelogReader.allChangelogsCompleted()).thenReturn(false); - doThrow(taskCorruptedException).doNothing().when(changelogReader).restore(updatingTasks); + doThrow(taskCorruptedException).doReturn(0L).when(changelogReader).restore(updatingTasks); stateUpdater.start(); stateUpdater.add(task1); @@ -828,7 +841,7 @@ class DefaultStateUpdaterTest { mkEntry(controlTask.id(), controlTask) ); doThrow(streamsException) - .doNothing() + .doReturn(0L) .when(changelogReader).restore(updatingTasks); stateUpdater.start(); @@ -970,7 +983,7 @@ class DefaultStateUpdaterTest { mkEntry(controlTask.id(), controlTask) ); doThrow(streamsException) - .doNothing() + .doReturn(0L) .when(changelogReader).restore(updatingTasks); stateUpdater.start(); @@ -1133,7 +1146,7 @@ class DefaultStateUpdaterTest { mkEntry(controlTask.id(), controlTask) ); doThrow(streamsException) - .doNothing() + .doReturn(0L) .when(changelogReader).restore(updatingTasks); stateUpdater.start(); @@ -1184,7 +1197,7 @@ class DefaultStateUpdaterTest { mkEntry(task1.id(), task1), mkEntry(task2.id(), task2) ); - doNothing().doThrow(streamsException).when(changelogReader).restore(updatingTasks); + doReturn(0L).doThrow(streamsException).when(changelogReader).restore(updatingTasks); stateUpdater.start(); stateUpdater.add(task1); @@ -1215,10 +1228,10 @@ class DefaultStateUpdaterTest { mkEntry(task2.id(), task2), mkEntry(task3.id(), task3) ); - doNothing() + doReturn(0L) .doThrow(streamsException1) .when(changelogReader).restore(updatingTasksBeforeFirstThrow); - doNothing() + doReturn(0L) .doThrow(streamsException2) .when(changelogReader).restore(updatingTasksBeforeSecondThrow); stateUpdater.start(); @@ -1248,7 +1261,7 @@ class DefaultStateUpdaterTest { mkEntry(task2.id(), task2), mkEntry(task3.id(), task3) ); - doNothing().doThrow(taskCorruptedException).doNothing().when(changelogReader).restore(updatingTasks); + doReturn(0L).doThrow(taskCorruptedException).doReturn(0L).when(changelogReader).restore(updatingTasks); stateUpdater.start(); stateUpdater.add(task1); @@ -1362,7 +1375,7 @@ class DefaultStateUpdaterTest { public void shouldNotAutoCheckpointTasksIfIntervalNotElapsed() { // we need to use a non auto-ticking timer here to control how much time elapsed exactly final Time time = new MockTime(); - final DefaultStateUpdater stateUpdater = new DefaultStateUpdater("test-state-updater", config, changelogReader, topologyMetadata, time); + final DefaultStateUpdater stateUpdater = new DefaultStateUpdater("test-state-updater", metrics, config, changelogReader, topologyMetadata, time); try { final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); @@ -1466,11 +1479,11 @@ class DefaultStateUpdaterTest { mkEntry(standbyTask1.id(), standbyTask1), mkEntry(standbyTask2.id(), standbyTask2) ); - doNothing().doThrow(taskCorruptedException).doNothing().when(changelogReader).restore(updatingTasks1); + doReturn(0L).doThrow(taskCorruptedException).doReturn(0L).when(changelogReader).restore(updatingTasks1); final Map<TaskId, Task> updatingTasks2 = mkMap( mkEntry(activeTask1.id(), activeTask1) ); - doNothing().doThrow(streamsException).doNothing().when(changelogReader).restore(updatingTasks2); + doReturn(0L).doThrow(streamsException).doReturn(0L).when(changelogReader).restore(updatingTasks2); stateUpdater.start(); stateUpdater.add(standbyTask1); stateUpdater.add(activeTask1); @@ -1526,6 +1539,114 @@ class DefaultStateUpdaterTest { verifyGetTasks(mkSet(activeTask), mkSet(standbyTask)); } + @Test + public void shouldRecordMetrics() throws Exception { + final StreamTask activeTask1 = statefulTask(TASK_A_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StreamTask activeTask2 = statefulTask(TASK_B_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); + final StandbyTask standbyTask3 = standbyTask(TASK_A_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build(); + final StandbyTask standbyTask4 = standbyTask(TASK_B_0_1, mkSet(TOPIC_PARTITION_B_1)).inState(State.RUNNING).build(); + final Map<TaskId, Task> tasks1234 = mkMap( + mkEntry(activeTask1.id(), activeTask1), + mkEntry(activeTask2.id(), activeTask2), + mkEntry(standbyTask3.id(), standbyTask3), + mkEntry(standbyTask4.id(), standbyTask4) + ); + final Map<TaskId, Task> tasks13 = mkMap( + mkEntry(activeTask1.id(), activeTask1), + mkEntry(standbyTask3.id(), standbyTask3) + ); + + when(topologyMetadata.isPaused("B")).thenReturn(true); + when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); + when(changelogReader.allChangelogsCompleted()).thenReturn(false); + when(changelogReader.restore(tasks1234)).thenReturn(1L); + when(changelogReader.restore(tasks13)).thenReturn(1L); + when(changelogReader.isRestoringActive()).thenReturn(true); + stateUpdater.start(); + stateUpdater.add(activeTask1); + stateUpdater.add(activeTask2); + stateUpdater.add(standbyTask3); + stateUpdater.add(standbyTask4); + + verifyPausedTasks(activeTask2, standbyTask4); + assertThat(metrics.metrics().size(), is(11)); + + final Map<String, String> tagMap = new LinkedHashMap<>(); + tagMap.put("thread-id", "test-state-updater"); + + MetricName metricName = new MetricName("active-restoring-tasks", + "stream-state-updater-metrics", + "The number of active tasks currently undergoing restoration", + tagMap); + verifyMetric(metrics, metricName, is(1.0)); + + metricName = new MetricName("standby-updating-tasks", + "stream-state-updater-metrics", + "The number of standby tasks currently undergoing state update", + tagMap); + verifyMetric(metrics, metricName, is(1.0)); + + metricName = new MetricName("active-paused-tasks", + "stream-state-updater-metrics", + "The number of active tasks paused restoring", + tagMap); + verifyMetric(metrics, metricName, is(1.0)); + + metricName = new MetricName("standby-paused-tasks", + "stream-state-updater-metrics", + "The number of standby tasks paused state update", + tagMap); + verifyMetric(metrics, metricName, is(1.0)); + + metricName = new MetricName("idle-ratio", + "stream-state-updater-metrics", + "The fraction of time the thread spent on being idle", + tagMap); + verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d)); + + metricName = new MetricName("active-restore-ratio", + "stream-state-updater-metrics", + "The fraction of time the thread spent on restoring active tasks", + tagMap); + verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d)); + + metricName = new MetricName("standby-update-ratio", + "stream-state-updater-metrics", + "The fraction of time the thread spent on updating standby tasks", + tagMap); + verifyMetric(metrics, metricName, is(0.0d)); + + metricName = new MetricName("checkpoint-ratio", + "stream-state-updater-metrics", + "The fraction of time the thread spent on checkpointing tasks restored progress", + tagMap); + verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d)); + + metricName = new MetricName("restore-records-rate", + "stream-state-updater-metrics", + "The average per-second number of records restored", + tagMap); + verifyMetric(metrics, metricName, not(0.0d)); + + metricName = new MetricName("restore-call-rate", + "stream-state-updater-metrics", + "The average per-second number of restore calls triggered", + tagMap); + verifyMetric(metrics, metricName, not(0.0d)); + + stateUpdater.shutdown(Duration.ofMinutes(1)); + assertThat(metrics.metrics().size(), is(1)); + } + + @SuppressWarnings("unchecked") + private static <T> void verifyMetric(final Metrics metrics, + final MetricName metricName, + final Matcher<T> matcher) { + assertThat(metrics.metrics().get(metricName).metricName().description(), is(metricName.description())); + assertThat(metrics.metrics().get(metricName).metricName().tags(), is(metricName.tags())); + assertThat((T) metrics.metrics().get(metricName).metricValue(), matcher); + } + private void verifyGetTasks(final Set<StreamTask> expectedActiveTasks, final Set<StandbyTask> expectedStandbyTasks) { final Set<Task> tasks = stateUpdater.getTasks(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java index a70420cb44b..8d0f8c7a6b0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java @@ -46,8 +46,9 @@ public class MockChangelogReader implements ChangelogReader { } @Override - public void restore(final Map<TaskId, Task> tasks) { + public long restore(final Map<TaskId, Task> tasks) { // do nothing + return 0L; } @Override @@ -60,6 +61,11 @@ public class MockChangelogReader implements ChangelogReader { // do nothing } + @Override + public boolean isRestoringActive() { + return true; + } + @Override public Set<TopicPartition> completedChangelogs() { // assuming all restoring partitions are completed diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index a8768ddd3eb..6050de77be0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -1245,7 +1245,7 @@ public class StreamThreadTest { final ChangelogReader changelogReader = new MockChangelogReader() { @Override - public void restore(final Map<TaskId, Task> tasks) { + public long restore(final Map<TaskId, Task> tasks) { consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new byte[0], new byte[0])); consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new byte[1], new byte[0]));