This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7d504d6 HOTFIX: fix NPE in Kafka Streams IQ (#8158)
7d504d6 is described below
commit 7d504d668f266becd3562cbe688f10962c5ca87b
Author: Matthias J. Sax <[email protected]>
AuthorDate: Sun Feb 23 13:08:14 2020 +0100
HOTFIX: fix NPE in Kafka Streams IQ (#8158)
Reviewers: Vito Jeng <[email protected]>, Guozhang Wang
<[email protected]>
---
.../internals/StreamThreadStateStoreProvider.java | 9 ++++-
.../StreamThreadStateStoreProviderTest.java | 44 ++++++++++++++++++++++
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index 83dd8c9..57d16ee 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -59,7 +59,14 @@ public class StreamThreadStateStoreProvider {
final Map<TaskId, ? extends Task> tasks =
storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() :
streamThread.activeTaskMap();
final List<T> stores = new ArrayList<>();
if (keyTaskId != null) {
- final T store =
validateAndListStores(tasks.get(keyTaskId).getStore(storeName),
queryableStoreType, storeName, keyTaskId);
+ final Task task = tasks.get(keyTaskId);
+ if (task == null) {
+ throw new InvalidStateStoreException(
+ String.format("The specified partition %d for store %s
does not exist.",
+ storeQueryParams.partition(),
+ storeName));
+ }
+ final T store =
validateAndListStores(task.getStore(storeName), queryableStoreType, storeName,
keyTaskId);
if (store != null) {
return Collections.singletonList(store);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index aa14a2e..911d2e3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -72,7 +72,9 @@ import java.util.Set;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
+import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
public class StreamThreadStateStoreProviderTest {
@@ -291,6 +293,48 @@ public class StreamThreadStateStoreProviderTest {
}
@Test
+ public void shouldReturnSingleStoreForPartition() {
+ mockThread(true);
+ {
+ final List<ReadOnlyKeyValueStore<String, String>> kvStores =
+ provider.stores(
+ StoreQueryParameters
+ .fromNameAndType("kv-store",
QueryableStoreTypes.keyValueStore())
+ .withPartition(0));
+ assertEquals(1, kvStores.size());
+ for (final ReadOnlyKeyValueStore<String, String> store : kvStores)
{
+ assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
+ assertThat(store,
not(instanceOf(TimestampedKeyValueStore.class)));
+ }
+ }
+ {
+ final List<ReadOnlyKeyValueStore<String, String>> kvStores =
+ provider.stores(
+ StoreQueryParameters
+ .fromNameAndType("kv-store",
QueryableStoreTypes.keyValueStore())
+ .withPartition(1));
+ assertEquals(1, kvStores.size());
+ for (final ReadOnlyKeyValueStore<String, String> store : kvStores)
{
+ assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
+ assertThat(store,
not(instanceOf(TimestampedKeyValueStore.class)));
+ }
+ }
+ }
+
+ @Test
+ public void shouldThrowForInvalidPartitions() {
+ mockThread(true);
+ final InvalidStateStoreException thrown = assertThrows(
+ InvalidStateStoreException.class,
+ () -> provider.stores(
+ StoreQueryParameters
+ .fromNameAndType("kv-store",
QueryableStoreTypes.keyValueStore())
+ .withPartition(2))
+ );
+ assertThat(thrown.getMessage(), equalTo("The specified partition 2 for
store kv-store does not exist."));
+ }
+
+ @Test
public void shouldReturnEmptyListIfStoreExistsButIsNotOfTypeValueStore() {
mockThread(true);
assertEquals(