This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 788793dee6f KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener (#13179) 788793dee6f is described below commit 788793dee6fa5d7ba5cb7d756b72c7d043dc8789 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Tue Feb 7 11:33:09 2023 -0800 KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener (#13179) 1. Add the new API (default impl is empty) to StateRestoreListener. 2. Update related unit tests Reviewers: Lucas Brutschy <lucas...@users.noreply.github.com>, Matthias J. Sax <mj...@apache.org> --- .../streams/processor/StateRestoreListener.java | 22 ++++ .../processor/internals/StoreChangelogReader.java | 15 +++ .../internals/StoreChangelogReaderTest.java | 133 +++++++++++++++++++-- .../kafka/test/MockStateRestoreListener.java | 10 ++ 4 files changed, 173 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java index 6ba794f187b..006cc58cd43 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java @@ -22,13 +22,16 @@ import org.apache.kafka.common.TopicPartition; /** * Class for listening to various states of the restoration process of a StateStore. * + * <p> * When calling {@link org.apache.kafka.streams.KafkaStreams#setGlobalStateRestoreListener(StateRestoreListener)} * the passed instance is expected to be stateless since the {@code StateRestoreListener} is shared * across all {@link org.apache.kafka.streams.processor.internals.StreamThread} instances. * + * <p> * Users desiring stateful operations will need to provide synchronization internally in * the {@code StateRestorerListener} implementation. * + * <p> * Note that this listener is only registered at the per-client level and users can base on the {@code storeName} * parameter to define specific monitoring for different {@link StateStore}s. There is another * {@link StateRestoreCallback} interface which is registered via the @@ -37,6 +40,12 @@ import org.apache.kafka.common.TopicPartition; * These two interfaces serve different restoration purposes and users should not try to implement both of them in a single * class during state store registration. * + * <p> + * Also note that the update process of standby tasks is not monitored via this interface, since a standby task does + * note actually <it>restore</it> state, but keeps updating its state from the changelogs written by the active task + * which does not ever finish. + * + * <p> * Incremental updates are exposed so users can estimate how much progress has been made. */ public interface StateRestoreListener { @@ -85,4 +94,17 @@ public interface StateRestoreListener { final String storeName, final long totalRestored); + /** + * Method called when restoring the {@link StateStore} is suspended due to the task being migrated out of the host. + * If the migrated task is recycled or re-assigned back to the current host, another + * {@link #onRestoreStart(TopicPartition, String, long, long)} would be called. + * + * @param topicPartition the {@link TopicPartition} containing the values to restore + * @param storeName the name of the store just restored + * @param totalRestored the total number of records restored for this TopicPartition before being paused + */ + default void onRestoreSuspended(final TopicPartition topicPartition, + final String storeName, + final long totalRestored) { + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 874f1993c19..be580f3575c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -986,8 +986,23 @@ public class StoreChangelogReader implements ChangelogReader { for (final TopicPartition partition : revokedChangelogs) { final ChangelogMetadata changelogMetadata = changelogs.remove(partition); if (changelogMetadata != null) { + // if the changelog is still in REGISTERED, it means it has not initialized and started + // restoring yet, and hence we should not try to remove the changelog partition if (!changelogMetadata.state().equals(ChangelogState.REGISTERED)) { revokedInitializedChangelogs.add(partition); + + // if the changelog is not in RESTORING, it means + // the corresponding onRestoreStart was not called; in this case + // we should not call onRestoreSuspended either + if (changelogMetadata.stateManager.taskType() == Task.TaskType.ACTIVE && + changelogMetadata.state().equals(ChangelogState.RESTORING)) { + try { + final String storeName = changelogMetadata.storeMetadata.store().name(); + stateRestoreListener.onRestoreSuspended(partition, storeName, changelogMetadata.totalRestored); + } catch (final Exception e) { + throw new StreamsException("State restore listener failed on restore paused", e); + } + } } changelogMetadata.clear(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index fbd0db99a0d..3ceaed80210 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -72,6 +72,7 @@ import static org.apache.kafka.streams.processor.internals.Task.TaskType.ACTIVE; import static org.apache.kafka.streams.processor.internals.Task.TaskType.STANDBY; import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH; import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END; +import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_SUSPENDED; import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START; import static org.easymock.EasyMock.anyLong; import static org.easymock.EasyMock.anyObject; @@ -198,10 +199,83 @@ public class StoreChangelogReaderTest extends EasyMockSupport { } @Test - public void shouldInitializeChangelogAndCheckForCompletion() { + public void shouldSupportUnregisterChangelogBeforeInitialization() { + final Map<TaskId, Task> mockTasks = mock(Map.class); + EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); + EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); + EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 10L)); + EasyMock.replay(mockTasks, stateManager, storeMetadata, store); + + adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L)); + + final StoreChangelogReader changelogReader = + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); + + changelogReader.register(tp, stateManager); + + if (type == STANDBY) { + changelogReader.transitToUpdateStandby(); + } + + changelogReader.unregister(Collections.singleton(tp)); + + assertEquals(Collections.emptySet(), consumer.assignment()); + + assertNull(callback.restoreTopicPartition); + assertNull(callback.storeNameCalledStates.get(RESTORE_START)); + assertNull(callback.storeNameCalledStates.get(RESTORE_SUSPENDED)); + assertNull(callback.storeNameCalledStates.get(RESTORE_BATCH)); + } + + @Test + public void shouldSupportUnregisterChangelogBeforeCompletion() { final Map<TaskId, Task> mockTasks = mock(Map.class); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); + EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 10L)); + EasyMock.replay(mockTasks, stateManager, storeMetadata, store); + + adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L)); + + final StoreChangelogReader changelogReader = + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); + + changelogReader.register(tp, stateManager); + + if (type == STANDBY) { + changelogReader.transitToUpdateStandby(); + } + + changelogReader.restore(mockTasks); + + assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored()); + assertEquals(Collections.emptySet(), changelogReader.completedChangelogs()); + assertEquals(10L, consumer.position(tp)); + assertEquals(Collections.emptySet(), consumer.paused()); + assertEquals(Collections.singleton(tp), consumer.assignment()); + + changelogReader.unregister(Collections.singleton(tp)); + + assertEquals(Collections.emptySet(), consumer.assignment()); + + if (type == ACTIVE) { + assertEquals(tp, callback.restoreTopicPartition); + assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_START)); + assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_SUSPENDED)); + } else { + assertNull(callback.restoreTopicPartition); + assertNull(callback.storeNameCalledStates.get(RESTORE_START)); + assertNull(callback.storeNameCalledStates.get(RESTORE_SUSPENDED)); + } + assertNull(callback.storeNameCalledStates.get(RESTORE_BATCH)); + } + + @Test + public void shouldSupportUnregisterChangelogAfterCompletion() { + final Map<TaskId, Task> mockTasks = mock(Map.class); + EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); + EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); + EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 10L)); EasyMock.replay(mockTasks, stateManager, storeMetadata, store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); @@ -209,20 +283,65 @@ public class StoreChangelogReaderTest extends EasyMockSupport { final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); + changelogReader.register(tp, stateManager); + + if (type == STANDBY) { + changelogReader.transitToUpdateStandby(); + } + + changelogReader.restore(mockTasks); + + assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored()); + assertEquals(10L, consumer.position(tp)); + + assertEquals(Collections.singleton(tp), consumer.assignment()); + if (type == ACTIVE) { + assertEquals(Collections.singleton(tp), changelogReader.completedChangelogs()); + assertEquals(Collections.singleton(tp), consumer.paused()); + } else { + assertEquals(Collections.emptySet(), changelogReader.completedChangelogs()); + assertEquals(Collections.emptySet(), consumer.paused()); + } + + changelogReader.unregister(Collections.singleton(tp)); + + assertEquals(Collections.emptySet(), consumer.assignment()); + + if (type == ACTIVE) { + assertEquals(tp, callback.restoreTopicPartition); + assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_START)); + assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_END)); + assertNull(callback.storeNameCalledStates.get(RESTORE_SUSPENDED)); + assertNull(callback.storeNameCalledStates.get(RESTORE_BATCH)); + } + } + + @Test + public void shouldInitializeChangelogAndCheckForCompletion() { + final Map<TaskId, Task> mockTasks = mock(Map.class); + EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); + EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); + EasyMock.replay(mockTasks, stateManager, storeMetadata, store); + + adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); + + final StoreChangelogReader changelogReader = + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); + changelogReader.register(tp, stateManager); changelogReader.restore(mockTasks); assertEquals( - type == ACTIVE ? - StoreChangelogReader.ChangelogState.COMPLETED : - StoreChangelogReader.ChangelogState.RESTORING, - changelogReader.changelogMetadata(tp).state() + type == ACTIVE ? + StoreChangelogReader.ChangelogState.COMPLETED : + StoreChangelogReader.ChangelogState.RESTORING, + changelogReader.changelogMetadata(tp).state() ); assertEquals(type == ACTIVE ? 10L : null, changelogReader.changelogMetadata(tp).endOffset()); assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored()); assertEquals( - type == ACTIVE ? Collections.singleton(tp) : Collections.emptySet(), - changelogReader.completedChangelogs() + type == ACTIVE ? Collections.singleton(tp) : Collections.emptySet(), + changelogReader.completedChangelogs() ); assertEquals(10L, consumer.position(tp)); assertEquals(Collections.singleton(tp), consumer.paused()); diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java b/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java index 10269699d04..6c423a48705 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java +++ b/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java @@ -37,6 +37,7 @@ public class MockStateRestoreListener implements StateRestoreListener { public static final String RESTORE_START = "restore_start"; public static final String RESTORE_BATCH = "restore_batch"; public static final String RESTORE_END = "restore_end"; + public static final String RESTORE_SUSPENDED = "restore_suspended"; @Override public void onRestoreStart(final TopicPartition topicPartition, @@ -69,6 +70,15 @@ public class MockStateRestoreListener implements StateRestoreListener { totalNumRestored = totalRestored; } + @Override + public void onRestoreSuspended(final TopicPartition topicPartition, + final String storeName, + final long totalRestored) { + restoreTopicPartition = topicPartition; + storeNameCalledStates.put(RESTORE_SUSPENDED, storeName); + totalNumRestored = totalRestored; + } + @Override public String toString() { return "MockStateRestoreListener{" +