guozhangwang commented on code in PR #13523:
URL: https://github.com/apache/kafka/pull/13523#discussion_r1160970925
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -219,13 +219,13 @@ public boolean isActive() {
}
@Override
- public void maybeRecordRestored(final Time time, final long numRecords) {
- maybeRecordSensor(numRecords, time, restoreSensor);
- maybeRecordSensor(-1 * numRecords, time, restoreRemainingSensor);
- }
-
- public void initRemainingRecordsToRestore(final Time time, final long
numRecords) {
- maybeRecordSensor(numRecords, time, restoreRemainingSensor);
+ public void recordRestoration(final Time time, final long numRecords,
final boolean initRemaining) {
Review Comment:
This is a minor refactor to make sure all sensor recordings are inherited
from the `Task` interface so that we do not need to cast to StreamTask.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -748,7 +748,7 @@ public Set<Task> getTasks() {
@Override
public boolean restoresActiveTasks() {
return !executeWithQueuesLocked(
- () ->
getStreamOfNonPausedTasks().filter(Task::isActive).collect(Collectors.toSet())
+ () ->
getStreamOfTasks().filter(Task::isActive).collect(Collectors.toSet())
Review Comment:
This is to make the case with state-updater consistent with the one without
state-updater: when tasks are paused, they are still considered as the ones
that need to be restoration completed, before we can transit the main thread to
RUNNING.
See below the TODO marker on the interface API for details.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -644,20 +650,19 @@ private int restoreChangelog(final Task task, final
ChangelogMetadata changelogM
final int numRecords = changelogMetadata.bufferedLimitIndex;
if (numRecords != 0) {
- final List<ConsumerRecord<byte[], byte[]>> records = new
ArrayList<>(changelogMetadata.bufferedRecords.subList(0, numRecords));
+ final List<ConsumerRecord<byte[], byte[]>> records =
changelogMetadata.bufferedRecords.subList(0, numRecords);
Review Comment:
This is the fix 2) in the description.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -749,6 +754,11 @@ private Map<TopicPartition, Long>
committedOffsetForChangelogs(final Map<TaskId,
}
}
+ private void filterNewPartitionsToRestore(final Map<TaskId, Task> tasks,
final Set<ChangelogMetadata> newPartitionsToRestore) {
Review Comment:
This is to fix the issue found before: if a task is paused, we should not
initialize their changelogs, and hence the restore consumer would not poll
their records, and hence we would never got a task to be restore while it's not
in the `tasks` set.
##########
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##########
@@ -188,12 +188,11 @@ public void shouldAllowForTopologiesToStartPaused(final
boolean stateUpdaterEnab
kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
kafkaStreams.pause();
kafkaStreams.start();
- waitForApplicationState(singletonList(kafkaStreams), State.RUNNING,
STARTUP_TIMEOUT);
+ waitForApplicationState(singletonList(kafkaStreams),
State.REBALANCING, STARTUP_TIMEOUT);
Review Comment:
This is mainly for 4) in the description: when we pause the tasks, the state
should not transit to RUNNING. This test did not fail because of the bug we are
not fixing in line 823 of StoreChangelogReader above.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -988,6 +1010,14 @@ private void prepareChangelogs(final Map<TaskId, Task>
tasks,
} catch (final Exception e) {
throw new StreamsException("State restore listener failed
on batch restored", e);
}
+
+ final TaskId taskId = changelogMetadata.stateManager.taskId();
Review Comment:
This is to bring back the existing code that previous hot-fix has to remove.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -482,7 +482,7 @@ public void
shouldReturnFalseForRestoreActiveTasksIfTaskPaused() throws Exceptio
verifyRemovedTasks();
verifyPausedTasks(task);
- assertFalse(stateUpdater.restoresActiveTasks());
+ assertTrue(stateUpdater.restoresActiveTasks());
Review Comment:
This is related to the change making sure with and without state updater has
consistent behavior.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -456,15 +456,21 @@ public long restore(final Map<TaskId, Task> tasks) {
// TODO: we always try to restore as a batch when some records
are accumulated, which may result in
// small batches; this can be optimized in the future,
e.g. wait longer for larger batches.
final TaskId taskId =
changelogs.get(partition).stateManager.taskId();
- try {
- final Task task = tasks.get(taskId);
- final ChangelogMetadata changelogMetadata =
changelogs.get(partition);
- totalRestored += restoreChangelog(task, changelogMetadata);
- } catch (final TimeoutException timeoutException) {
- tasks.get(taskId).maybeInitTaskTimeoutOrThrow(
- time.milliseconds(),
- timeoutException
- );
+ final Task task = tasks.get(taskId);
Review Comment:
Similarly, given the fix below in line 757, the task should never be null.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -691,7 +696,7 @@ private int restoreChangelog(final Task task, final
ChangelogMetadata changelogM
}
}
- if (task != null && (numRecords > 0 ||
changelogMetadata.state().equals(ChangelogState.COMPLETED))) {
+ if (numRecords > 0 ||
changelogMetadata.state().equals(ChangelogState.COMPLETED)) {
Review Comment:
Given the fix in 757 below, the task should never be null.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -880,6 +896,9 @@ private void initializeChangelogs(final Map<TaskId, Task>
tasks,
}
private void addChangelogsToRestoreConsumer(final Set<TopicPartition>
partitions) {
+ if (partitions.isEmpty())
Review Comment:
This and the line 929 below is to reduce verbose logging lines of
KafkaConsumer when it calls `unsubscribe`: if the partitions set is empty,
while the current assignment is also empty, we should not proceed at all.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java:
##########
@@ -266,12 +264,21 @@ public void shouldGetDroppedRecordsSensor() {
try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock =
mockStatic(StreamsMetricsImpl.class)) {
final Sensor sensor = TaskMetrics.droppedRecordsSensor(THREAD_ID,
TASK_ID, streamsMetrics);
streamsMetricsStaticMock.verify(
- () -> StreamsMetricsImpl.addInvocationRateAndCountToSensor(
+ () -> StreamsMetricsImpl.addInvocationRateToSensor(
Review Comment:
This is related to 3) in the description.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]