This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7d504d6  HOTFIX: fix NPE in Kafka Streams IQ (#8158)
7d504d6 is described below

commit 7d504d668f266becd3562cbe688f10962c5ca87b
Author: Matthias J. Sax <[email protected]>
AuthorDate: Sun Feb 23 13:08:14 2020 +0100

    HOTFIX: fix NPE in Kafka Streams IQ (#8158)
    
    Reviewers: Vito Jeng <[email protected]>, Guozhang Wang 
<[email protected]>
---
 .../internals/StreamThreadStateStoreProvider.java  |  9 ++++-
 .../StreamThreadStateStoreProviderTest.java        | 44 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index 83dd8c9..57d16ee 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -59,7 +59,14 @@ public class StreamThreadStateStoreProvider {
             final Map<TaskId, ? extends Task> tasks = 
storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : 
streamThread.activeTaskMap();
             final List<T> stores = new ArrayList<>();
             if (keyTaskId != null) {
-                final T store = 
validateAndListStores(tasks.get(keyTaskId).getStore(storeName), 
queryableStoreType, storeName, keyTaskId);
+                final Task task = tasks.get(keyTaskId);
+                if (task == null) {
+                    throw new InvalidStateStoreException(
+                        String.format("The specified partition %d for store %s 
does not exist.",
+                            storeQueryParams.partition(),
+                            storeName));
+                }
+                final T store = 
validateAndListStores(task.getStore(storeName), queryableStoreType, storeName, 
keyTaskId);
                 if (store != null) {
                     return Collections.singletonList(store);
                 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index aa14a2e..911d2e3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -72,7 +72,9 @@ import java.util.Set;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 
 public class StreamThreadStateStoreProviderTest {
 
@@ -291,6 +293,48 @@ public class StreamThreadStateStoreProviderTest {
     }
 
     @Test
+    public void shouldReturnSingleStoreForPartition() {
+        mockThread(true);
+        {
+            final List<ReadOnlyKeyValueStore<String, String>> kvStores =
+                provider.stores(
+                    StoreQueryParameters
+                        .fromNameAndType("kv-store", 
QueryableStoreTypes.keyValueStore())
+                        .withPartition(0));
+            assertEquals(1, kvStores.size());
+            for (final ReadOnlyKeyValueStore<String, String> store : kvStores) 
{
+                assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
+                assertThat(store, 
not(instanceOf(TimestampedKeyValueStore.class)));
+            }
+        }
+        {
+            final List<ReadOnlyKeyValueStore<String, String>> kvStores =
+                provider.stores(
+                    StoreQueryParameters
+                        .fromNameAndType("kv-store", 
QueryableStoreTypes.keyValueStore())
+                        .withPartition(1));
+            assertEquals(1, kvStores.size());
+            for (final ReadOnlyKeyValueStore<String, String> store : kvStores) 
{
+                assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
+                assertThat(store, 
not(instanceOf(TimestampedKeyValueStore.class)));
+            }
+        }
+    }
+
+    @Test
+    public void shouldThrowForInvalidPartitions() {
+        mockThread(true);
+        final InvalidStateStoreException thrown = assertThrows(
+            InvalidStateStoreException.class,
+            () -> provider.stores(
+                StoreQueryParameters
+                    .fromNameAndType("kv-store", 
QueryableStoreTypes.keyValueStore())
+                    .withPartition(2))
+        );
+        assertThat(thrown.getMessage(), equalTo("The specified partition 2 for 
store kv-store does not exist."));
+    }
+
+    @Test
     public void shouldReturnEmptyListIfStoreExistsButIsNotOfTypeValueStore() {
         mockThread(true);
         assertEquals(

Reply via email to