This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push: new 2a8fc8d KAFKA-10271: Performance regression while fetching a key from a single partition (#9020) 2a8fc8d is described below commit 2a8fc8dd74a753ee108258185791322e1cfc4b81 Author: Dima Reznik <dim...@fiverr.com> AuthorDate: Thu Oct 8 20:12:33 2020 +0300 KAFKA-10271: Performance regression while fetching a key from a single partition (#9020) StreamThreadStateStoreProvider excessive loop over calling internalTopologyBuilder.topicGroups(), which is synchronized, thus causing significant performance degradation to the caller, especially when store has many partitions. Reviewers: John Roesler <vvcep...@apache.org>, Guozhang Wang <wangg...@gmail.com> --- .../org/apache/kafka/streams/KafkaStreams.java | 2 +- .../state/internals/QueryableStoreProvider.java | 20 ------- .../internals/StreamThreadStateStoreProvider.java | 67 ++++++++-------------- .../state/internals/WrappingStoreProvider.java | 17 +++++- .../integration/EosBetaUpgradeIntegrationTest.java | 1 + .../internals/QueryableStoreProviderTest.java | 12 ++-- .../StreamThreadStateStoreProviderTest.java | 2 +- .../state/internals/WrappingStoreProviderTest.java | 44 +++++++++----- .../apache/kafka/test/StateStoreProviderStub.java | 2 +- 9 files changed, 78 insertions(+), 89 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index b3900c2..b26c6e3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -784,7 +784,7 @@ public class KafkaStreams implements AutoCloseable { delegatingStateRestoreListener, i + 1); threadState.put(threads[i].getId(), threads[i].state()); - storeProviders.add(new StreamThreadStateStoreProvider(threads[i], internalTopologyBuilder)); + storeProviders.add(new StreamThreadStateStoreProvider(threads[i])); } ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java index 2af5874..8dd1f03 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.StoreQueryParameters; -import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.QueryableStoreType; @@ -56,25 +55,6 @@ public class QueryableStoreProvider { if (!globalStore.isEmpty()) { return queryableStoreType.create(globalStoreProvider, storeName); } - final List<T> allStores = new ArrayList<>(); - for (final StreamThreadStateStoreProvider storeProvider : storeProviders) { - final List<T> stores = storeProvider.stores(storeQueryParameters); - if (!stores.isEmpty()) { - allStores.addAll(stores); - if (storeQueryParameters.partition() != null) { - break; - } - } - } - if (allStores.isEmpty()) { - if (storeQueryParameters.partition() != null) { - throw new InvalidStateStoreException( - String.format("The specified partition %d for store %s does not exist.", - storeQueryParameters.partition(), - storeName)); - } - throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance."); - } return queryableStoreType.create( new WrappingStoreProvider(storeProviders, storeQueryParameters), storeName diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java index 7cc263a..d5a175d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java @@ -20,7 +20,6 @@ import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.Task; import org.apache.kafka.streams.state.QueryableStoreType; @@ -28,54 +27,46 @@ import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.TimestampedWindowStore; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.Collection; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; public class StreamThreadStateStoreProvider { private final StreamThread streamThread; - private final InternalTopologyBuilder internalTopologyBuilder; - public StreamThreadStateStoreProvider(final StreamThread streamThread, - final InternalTopologyBuilder internalTopologyBuilder) { + public StreamThreadStateStoreProvider(final StreamThread streamThread) { this.streamThread = streamThread; - this.internalTopologyBuilder = internalTopologyBuilder; } @SuppressWarnings("unchecked") public <T> List<T> stores(final StoreQueryParameters storeQueryParams) { final String storeName = storeQueryParams.storeName(); final QueryableStoreType<T> queryableStoreType = storeQueryParams.queryableStoreType(); - final TaskId keyTaskId = createKeyTaskId(storeName, storeQueryParams.partition()); if (streamThread.state() == StreamThread.State.DEAD) { return Collections.emptyList(); } final StreamThread.State state = streamThread.state(); if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) { - final Map<TaskId, ? extends Task> tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap(); - final List<T> stores = new ArrayList<>(); - if (keyTaskId != null) { - final Task task = tasks.get(keyTaskId); - if (task == null) { - return Collections.emptyList(); - } - final T store = validateAndListStores(task.getStore(storeName), queryableStoreType, storeName, keyTaskId); - if (store != null) { - return Collections.singletonList(store); - } + final Collection<Task> tasks = storeQueryParams.staleStoresEnabled() ? + streamThread.allTasks().values() : streamThread.activeTasks(); + + if (storeQueryParams.partition() != null) { + return findStreamTask(tasks, storeName, storeQueryParams.partition()). + map(streamTask -> + validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). + map(Collections::singletonList). + orElse(Collections.emptyList()); } else { - for (final Task streamTask : tasks.values()) { - final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); - if (store != null) { - stores.add(store); - } - } + return tasks.stream(). + map(streamTask -> + validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). + filter(Objects::nonNull). + collect(Collectors.toList()); } - return stores; } else { throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " + state + ", not RUNNING" + @@ -104,19 +95,11 @@ public class StreamThreadStateStoreProvider { } } - private TaskId createKeyTaskId(final String storeName, final Integer partition) { - if (partition == null) { - return null; - } - final List<String> sourceTopics = internalTopologyBuilder.stateStoreNameToSourceTopics().get(storeName); - final Set<String> sourceTopicsSet = new HashSet<>(sourceTopics); - final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = internalTopologyBuilder.topicGroups(); - for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> topicGroup : topicGroups.entrySet()) { - if (topicGroup.getValue().sourceTopics.containsAll(sourceTopicsSet)) { - return new TaskId(topicGroup.getKey(), partition); - } - } - throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the requested partition " + - partition + " is not available on this instance"); + private Optional<Task> findStreamTask(final Collection<Task> tasks, final String storeName, final int partition) { + return tasks.stream(). + filter(streamTask -> streamTask.id().partition == partition && + streamTask.getStore(storeName) != null && + storeName.equals(streamTask.getStore(storeName).name())). + findFirst(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java index 5c9ae1a..26c5db0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java @@ -46,11 +46,22 @@ public class WrappingStoreProvider implements StateStoreProvider { public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) { final List<T> allStores = new ArrayList<>(); - for (final StreamThreadStateStoreProvider provider : storeProviders) { - final List<T> stores = provider.stores(storeQueryParameters); - allStores.addAll(stores); + for (final StreamThreadStateStoreProvider storeProvider : storeProviders) { + final List<T> stores = storeProvider.stores(storeQueryParameters); + if (!stores.isEmpty()) { + allStores.addAll(stores); + if (storeQueryParameters.partition() != null) { + break; + } + } } if (allStores.isEmpty()) { + if (storeQueryParameters.partition() != null) { + throw new InvalidStateStoreException( + String.format("The specified partition %d for store %s does not exist.", + storeQueryParameters.partition(), + storeName)); + } throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance."); } return allStores; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java index cd57acb..d039d98 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java @@ -1080,6 +1080,7 @@ public class EosBetaUpgradeIntegrationTest { streams, StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()) ); + waitForCondition(() -> store.get(-1L) == null, MAX_WAIT_TIME_MS, () -> "State store did not ready: " + storeName); final Set<Long> keys = new HashSet<>(); try (final KeyValueIterator<Long, Long> it = store.all()) { while (it.hasNext()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java index 2d04755..f2ca0c0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java @@ -60,12 +60,12 @@ public class QueryableStoreProviderTest { @Test(expected = InvalidStateStoreException.class) public void shouldThrowExceptionIfKVStoreDoesntExist() { - storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.keyValueStore())); + storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.keyValueStore())).get("1"); } @Test(expected = InvalidStateStoreException.class) public void shouldThrowExceptionIfWindowStoreDoesntExist() { - storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.windowStore())); + storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.windowStore())).fetch("1", System.currentTimeMillis()); } @Test @@ -80,12 +80,12 @@ public class QueryableStoreProviderTest { @Test(expected = InvalidStateStoreException.class) public void shouldThrowExceptionWhenLookingForWindowStoreWithDifferentType() { - storeProvider.getStore(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.keyValueStore())); + storeProvider.getStore(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.keyValueStore())).get("1"); } @Test(expected = InvalidStateStoreException.class) public void shouldThrowExceptionWhenLookingForKVStoreWithDifferentType() { - storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.windowStore())); + storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.windowStore())).fetch("1", System.currentTimeMillis()); } @Test @@ -106,7 +106,7 @@ public class QueryableStoreProviderTest { storeProvider.getStore( StoreQueryParameters .fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore()) - .withPartition(partition)) + .withPartition(partition)).get("1") ); assertThat(thrown.getMessage(), equalTo(String.format("The specified partition %d for store %s does not exist.", partition, keyValueStore))); } @@ -123,7 +123,7 @@ public class QueryableStoreProviderTest { storeProvider.getStore( StoreQueryParameters .fromNameAndType(windowStore, QueryableStoreTypes.windowStore()) - .withPartition(partition)) + .withPartition(partition)).fetch("1", System.currentTimeMillis()) ); assertThat(thrown.getMessage(), equalTo(String.format("The specified partition %d for store %s does not exist.", partition, windowStore))); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 9dac534..2494575 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -161,7 +161,7 @@ public class StreamThreadStateStoreProviderTest { tasks.put(new TaskId(0, 1), taskTwo); threadMock = EasyMock.createNiceMock(StreamThread.class); - provider = new StreamThreadStateStoreProvider(threadMock, internalTopologyBuilder); + provider = new StreamThreadStateStoreProvider(threadMock); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java index ceb3f79..1897048 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java @@ -39,26 +39,24 @@ public class WrappingStoreProviderTest { private WrappingStoreProvider wrappingStoreProvider; + private final int numStateStorePartitions = 2; + @Before public void before() { final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub(false); final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub(false); - - stubProviderOne.addStore("kv", Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"), - Serdes.serdeFrom(String.class), - Serdes.serdeFrom(String.class)) - .build()); - stubProviderOne.addStore("window", new NoOpWindowStore()); - stubProviderTwo.addStore("kv", Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"), - Serdes.serdeFrom(String.class), - Serdes.serdeFrom(String.class)) - .build()); - stubProviderTwo.addStore("window", new NoOpWindowStore()); - wrappingStoreProvider = new WrappingStoreProvider( - Arrays.asList(stubProviderOne, stubProviderTwo), - StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.keyValueStore()) - ); + for (int partition = 0; partition < numStateStorePartitions; partition++) { + stubProviderOne.addStore("kv", partition, Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"), + Serdes.serdeFrom(String.class), + Serdes.serdeFrom(String.class)) + .build()); + stubProviderOne.addStore("window", partition, new NoOpWindowStore()); + wrappingStoreProvider = new WrappingStoreProvider( + Arrays.asList(stubProviderOne, stubProviderTwo), + StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.keyValueStore()) + ); + } } @Test @@ -82,4 +80,20 @@ public class WrappingStoreProviderTest { wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("doesn't exist", QueryableStoreTypes.<String, String>keyValueStore())); wrappingStoreProvider.stores("doesn't exist", QueryableStoreTypes.<String, String>keyValueStore()); } + + @Test + public void shouldReturnAllStoreWhenQueryWithoutPartition() { + wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.<String, String>keyValueStore())); + final List<ReadOnlyKeyValueStore<String, String>> results = + wrappingStoreProvider.stores("kv", QueryableStoreTypes.<String, String>keyValueStore()); + assertEquals(numStateStorePartitions, results.size()); + } + + @Test + public void shouldReturnSingleStoreWhenQueryWithPartition() { + wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.<String, String>keyValueStore()).withPartition(numStateStorePartitions - 1)); + final List<ReadOnlyKeyValueStore<String, String>> results = + wrappingStoreProvider.stores("kv", QueryableStoreTypes.<String, String>keyValueStore()); + assertEquals(1, results.size()); + } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java index bc0e33a..9d89ae2 100644 --- a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java +++ b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java @@ -39,7 +39,7 @@ public class StateStoreProviderStub extends StreamThreadStateStoreProvider { private final int defaultStorePartition = 0; public StateStoreProviderStub(final boolean throwException) { - super(null, null); + super(null); this.throwException = throwException; }