Repository: kafka
Updated Branches:
  refs/heads/trunk 70afd5f9d -> 3d74196f2


KAFKA-4163: NPE in StreamsMetadataState during re-balance operations

During rebalance operations the Cluster object gets set to Cluster.empty(). 
This can result in NPEs when doing certain operation on StreamsMetadataState. 
This should throw a StreamsException if the Cluster is empty as it is not yet 
(re-)initialized

Author: Damian Guy <damian....@gmail.com>

Reviewers: Eno Thereska, Guozhang Wang

Closes #1845 from dguy/streams-meta-hotfix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3d74196f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3d74196f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3d74196f

Branch: refs/heads/trunk
Commit: 3d74196f205c53946b6fc3dd0501aa2095f0031a
Parents: 70afd5f
Author: Damian Guy <damian....@gmail.com>
Authored: Mon Sep 19 11:00:53 2016 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Mon Sep 19 11:00:53 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |  8 ++-
 .../internals/StreamsMetadataState.java         | 23 ++++++++-
 .../kafka/streams/state/StreamsMetadata.java    |  9 ++++
 .../CompositeReadOnlyKeyValueStore.java         | 24 +++++++--
 .../internals/CompositeReadOnlyWindowStore.java | 15 ++++--
 .../state/internals/QueryableStoreProvider.java |  2 +-
 .../StreamThreadStateStoreProvider.java         |  4 +-
 .../state/internals/WrappingStoreProvider.java  |  3 +-
 .../QueryableStateIntegrationTest.java          | 45 ++++++++--------
 .../internals/StreamsMetadataStateTest.java     |  7 +++
 .../CompositeReadOnlyKeyValueStoreTest.java     | 21 ++++----
 .../CompositeReadOnlyWindowStoreTest.java       | 19 ++++++-
 .../internals/QueryableStoreProviderTest.java   |  3 +-
 .../internals/ReadOnlyWindowStoreStub.java      | 11 +++-
 .../state/internals/StateStoreProviderStub.java | 43 ----------------
 .../internals/WrappingStoreProviderTest.java    |  5 +-
 .../kafka/test/StateStoreProviderStub.java      | 54 ++++++++++++++++++++
 17 files changed, 194 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3d74196f/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 4fabdf7..d88d09e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -329,7 +329,8 @@ public class KafkaStreams {
      * @param key               Key to use to for partition
      * @param keySerializer     Serializer for the key
      * @param <K>               key type
-     * @return  The {@link StreamsMetadata} for the storeName and key
+     * @return  The {@link StreamsMetadata} for the storeName and key or 
{@link StreamsMetadata#NOT_AVAILABLE}
+     * if streams is (re-)initializing
      */
     public <K> StreamsMetadata metadataForKey(final String storeName,
                                               final K key,
@@ -350,7 +351,8 @@ public class KafkaStreams {
      * @param key               Key to use to for partition
      * @param partitioner       Partitioner for the store
      * @param <K>               key type
-     * @return  The {@link StreamsMetadata} for the storeName and key
+     * @return  The {@link StreamsMetadata} for the storeName and key or 
{@link StreamsMetadata#NOT_AVAILABLE}
+     * if streams is (re-)initializing
      */
     public <K> StreamsMetadata metadataForKey(final String storeName,
                                               final K key,
@@ -368,6 +370,8 @@ public class KafkaStreams {
      * @param queryableStoreType    accept only stores that are accepted by 
{@link QueryableStoreType#accepts(StateStore)}
      * @param <T>                   return type
      * @return  A facade wrapping the {@link 
org.apache.kafka.streams.processor.StateStore} instances
+     * @throws org.apache.kafka.streams.errors.InvalidStateStoreException if 
the streams are (re-)initializing or
+     * a store with storeName and queryableStoreType doesnt' exist.
      */
     public <T> T store(final String storeName, final QueryableStoreType<T> 
queryableStoreType) {
         validateIsRunning();

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d74196f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index eeb3bc9..6f9bea6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -70,6 +70,10 @@ public class StreamsMetadataState {
     public synchronized Collection<StreamsMetadata> 
getAllMetadataForStore(final String storeName) {
         Objects.requireNonNull(storeName, "storeName cannot be null");
 
+        if (!isInitialized()) {
+            return Collections.emptyList();
+        }
+
         final Set<String> sourceTopics = 
builder.stateStoreNameToSourceTopics().get(storeName);
         if (sourceTopics == null) {
             return Collections.emptyList();
@@ -96,7 +100,8 @@ public class StreamsMetadataState {
      * @param key           Key to use
      * @param keySerializer Serializer for the key
      * @param <K>           key type
-     * @return The {@link StreamsMetadata} for the storeName and key
+     * @return The {@link StreamsMetadata} for the storeName and key or {@link 
StreamsMetadata#NOT_AVAILABLE}
+     * if streams is (re-)initializing
      */
     public synchronized <K> StreamsMetadata getMetadataWithKey(final String 
storeName,
                                                                final K key,
@@ -105,10 +110,15 @@ public class StreamsMetadataState {
         Objects.requireNonNull(storeName, "storeName can't be null");
         Objects.requireNonNull(key, "key can't be null");
 
+        if (!isInitialized()) {
+            return StreamsMetadata.NOT_AVAILABLE;
+        }
+
         final SourceTopicsInfo sourceTopicsInfo = 
getSourceTopicsInfo(storeName);
         if (sourceTopicsInfo == null) {
             return null;
         }
+
         return getStreamsMetadataForKey(storeName,
                                         key,
                                         new 
DefaultStreamPartitioner<>(keySerializer,
@@ -131,7 +141,8 @@ public class StreamsMetadataState {
      * @param key         Key to use
      * @param partitioner partitioner to use to find correct partition for key
      * @param <K>         key type
-     * @return The {@link StreamsMetadata} for the storeName and key
+     * @return The {@link StreamsMetadata} for the storeName and key or {@link 
StreamsMetadata#NOT_AVAILABLE}
+     * if streams is (re-)initializing
      */
     public synchronized <K> StreamsMetadata getMetadataWithKey(final String 
storeName,
                                                                final K key,
@@ -140,6 +151,10 @@ public class StreamsMetadataState {
         Objects.requireNonNull(key, "key can't be null");
         Objects.requireNonNull(partitioner, "partitioner can't be null");
 
+        if (!isInitialized()) {
+            return StreamsMetadata.NOT_AVAILABLE;
+        }
+
         SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName);
         if (sourceTopicsInfo == null) {
             return null;
@@ -218,6 +233,10 @@ public class StreamsMetadataState {
         return new SourceTopicsInfo(sourceTopics);
     }
 
+    private boolean isInitialized() {
+        return !clusterMetadata.topics().isEmpty();
+    }
+
     private class SourceTopicsInfo {
         private final Set<String> sourceTopics;
         private int maxPartitions;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d74196f/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java 
b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
index 541221f..9602bfe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.KafkaStreams;
 
+import java.util.Collections;
 import java.util.Set;
 
 /**
@@ -29,6 +30,14 @@ import java.util.Set;
  * NOTE: This is a point in time view. It may change when rebalances happen.
  */
 public class StreamsMetadata {
+    /**
+     * Sentinel to indicate that the StreamsMetadata is currently unavailable. 
This can occur during rebalance
+     * operations.
+     */
+    public final static StreamsMetadata NOT_AVAILABLE = new 
StreamsMetadata(new HostInfo("unavailable", -1),
+                                                                            
Collections.<String>emptySet(),
+                                                                            
Collections.<TopicPartition>emptySet());
+
     private final HostInfo hostInfo;
     private final Set<String> stateStoreNames;
     private final Set<TopicPartition> topicPartitions;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d74196f/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
index 5ed1d35..5c47419 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
@@ -15,6 +15,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@@ -48,10 +49,15 @@ public class CompositeReadOnlyKeyValueStore<K, V> 
implements ReadOnlyKeyValueSto
     public V get(final K key) {
         final List<ReadOnlyKeyValueStore<K, V>> stores = 
storeProvider.stores(storeName, storeType);
         for (ReadOnlyKeyValueStore<K, V> store : stores) {
-            V result = store.get(key);
-            if (result != null) {
-                return result;
+            try {
+                final V result = store.get(key);
+                if (result != null) {
+                    return result;
+                }
+            } catch (InvalidStateStoreException e) {
+                throw new InvalidStateStoreException("State store is not 
available anymore and may have been migrated to another instance; please 
re-discover its location from the state metadata.");
             }
+
         }
         return null;
     }
@@ -61,7 +67,11 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements 
ReadOnlyKeyValueSto
         final NextIteratorFunction<K, V> nextIteratorFunction = new 
NextIteratorFunction<K, V>() {
             @Override
             public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, 
V> store) {
-                return store.range(from, to);
+                try {
+                    return store.range(from, to);
+                } catch (InvalidStateStoreException e) {
+                    throw new InvalidStateStoreException("State store is not 
available anymore and may have been migrated to another instance; please 
re-discover its location from the state metadata.");
+                }
             }
         };
         final List<ReadOnlyKeyValueStore<K, V>> stores = 
storeProvider.stores(storeName, storeType);
@@ -73,7 +83,11 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements 
ReadOnlyKeyValueSto
         final NextIteratorFunction<K, V> nextIteratorFunction = new 
NextIteratorFunction<K, V>() {
             @Override
             public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, 
V> store) {
-                return store.all();
+                try {
+                    return store.all();
+                } catch (InvalidStateStoreException e) {
+                    throw new InvalidStateStoreException("State store is not 
available anymore and may have been migrated to another instance; please 
re-discover its location from the state metadata.");
+                }
             }
         };
         final List<ReadOnlyKeyValueStore<K, V>> stores = 
storeProvider.stores(storeName, storeType);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d74196f/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index 9f58e17..b33c0f0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -15,6 +15,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -44,11 +45,15 @@ public class CompositeReadOnlyWindowStore<K, V> implements 
ReadOnlyWindowStore<K
     public WindowStoreIterator<V> fetch(final K key, final long timeFrom, 
final long timeTo) {
         final List<ReadOnlyWindowStore<K, V>> stores = 
provider.stores(storeName, windowStoreType);
         for (ReadOnlyWindowStore<K, V> windowStore : stores) {
-            final WindowStoreIterator<V> result = windowStore.fetch(key, 
timeFrom, timeTo);
-            if (!result.hasNext()) {
-                result.close();
-            } else {
-                return result;
+            try {
+                final WindowStoreIterator<V> result = windowStore.fetch(key, 
timeFrom, timeTo);
+                if (!result.hasNext()) {
+                    result.close();
+                } else {
+                    return result;
+                }
+            } catch (InvalidStateStoreException e) {
+                throw new InvalidStateStoreException("State store is not 
available anymore and may have been migrated to another instance; please 
re-discover its location from the state metadata.");
             }
         }
         return new WindowStoreIterator<V>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d74196f/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
index 640761c..64dac1f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
@@ -46,7 +46,7 @@ public class QueryableStoreProvider {
             allStores.addAll(storeProvider.stores(storeName, 
queryableStoreType));
         }
         if (allStores.isEmpty()) {
-            throw new InvalidStateStoreException("Store: " + storeName + " is 
currently not available");
+            throw new InvalidStateStoreException("the state store, " + 
storeName + ", may have migrated to another instance.");
         }
         return queryableStoreType.create(
                 new WrappingStoreProvider(storeProviders),

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d74196f/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index e761ed0..3a50a68 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -39,14 +39,14 @@ public class StreamThreadStateStoreProvider implements 
StateStoreProvider {
     @Override
     public <T> List<T> stores(final String storeName, final 
QueryableStoreType<T> queryableStoreType) {
         if (!streamThread.isInitialized()) {
-            throw new InvalidStateStoreException("Store: " + storeName + " is 
currently not available as the stream thread has not (re-)initialized yet");
+            throw new InvalidStateStoreException("the state store, " + 
storeName + ", may have migrated to another instance.");
         }
         final List<T> stores = new ArrayList<>();
         for (StreamTask streamTask : streamThread.tasks().values()) {
             final StateStore store = streamTask.getStore(storeName);
             if (store != null && queryableStoreType.accepts(store)) {
                 if (!store.isOpen()) {
-                    throw new InvalidStateStoreException("Store: " + storeName 
+ " isn't isOpen");
+                    throw new InvalidStateStoreException("the state store, " + 
storeName + ", may have migrated to another instance.");
                 }
                 stores.add((T) store);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d74196f/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
index 1672112..eb1bc64 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
@@ -48,8 +48,7 @@ public class WrappingStoreProvider implements 
StateStoreProvider {
             allStores.addAll(stores);
         }
         if (allStores.isEmpty()) {
-            throw new InvalidStateStoreException("Store " + storeName + " is 
currently "
-                                                 + "unavailable");
+            throw new InvalidStateStoreException("the state store, " + 
storeName + ", may have migrated to another instance.");
         }
         return allStores;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d74196f/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 67eb4a7..e6d7be8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -74,8 +74,6 @@ import java.util.concurrent.TimeUnit;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
 
-
-
 @RunWith(Parameterized.class)
 public class QueryableStateIntegrationTest {
     private static final int NUM_BROKERS = 1;
@@ -265,24 +263,23 @@ public class QueryableStateIntegrationTest {
             TestUtils.waitForCondition(new TestCondition() {
                 @Override
                 public boolean conditionMet() {
-                    final StreamsMetadata metadata = 
streams.metadataForKey(storeName, key, new StringSerializer());
-                    if (metadata == null) {
-                        return false;
-                    }
-                    final int index = metadata.hostInfo().port();
-                    final KafkaStreams streamsWithKey = 
streamRunnables[index].getStream();
-                    final ReadOnlyKeyValueStore<String, Long> store;
                     try {
-                        store = streamsWithKey.store(storeName, 
QueryableStoreTypes.<String, Long>keyValueStore());
-                    } catch (final InvalidStateStoreException e) {
-                        // rebalance
-                        return false;
+                        final StreamsMetadata metadata = 
streams.metadataForKey(storeName, key, new StringSerializer());
+                        if (metadata == null) {
+                            return false;
+                        }
+                        final int index = metadata.hostInfo().port();
+                        final KafkaStreams streamsWithKey = 
streamRunnables[index].getStream();
+                        final ReadOnlyKeyValueStore<String, Long> store = 
streamsWithKey.store(storeName, QueryableStoreTypes.<String, 
Long>keyValueStore());
+                        return store != null && store.get(key) != null;
                     } catch (final IllegalStateException e) {
                         // Kafka Streams instance may have closed but 
rebalance hasn't happened
                         return false;
-                    } 
+                    } catch (final InvalidStateStoreException e) {
+                        // rebalance
+                        return false;
+                    }
 
-                    return store != null && store.get(key) != null;
                 }
             }, 30000, "waiting for metadata, store and value to be non null");
         }
@@ -296,15 +293,15 @@ public class QueryableStateIntegrationTest {
             TestUtils.waitForCondition(new TestCondition() {
                 @Override
                 public boolean conditionMet() {
-                    final StreamsMetadata metadata = 
streams.metadataForKey(storeName, key, new StringSerializer());
-                    if (metadata == null) {
-                        return false;
-                    }
-                    final int index = metadata.hostInfo().port();
-                    final KafkaStreams streamsWithKey = 
streamRunnables[index].getStream();
-                    final ReadOnlyWindowStore<String, Long> store;
                     try {
-                        store = streamsWithKey.store(storeName, 
QueryableStoreTypes.<String, Long>windowStore());
+                        final StreamsMetadata metadata = 
streams.metadataForKey(storeName, key, new StringSerializer());
+                        if (metadata == null) {
+                            return false;
+                        }
+                        final int index = metadata.hostInfo().port();
+                        final KafkaStreams streamsWithKey = 
streamRunnables[index].getStream();
+                        final ReadOnlyWindowStore<String, Long> store = 
streamsWithKey.store(storeName, QueryableStoreTypes.<String, 
Long>windowStore());
+                        return store != null && store.fetch(key, from, to) != 
null;
                     } catch (final IllegalStateException e) {
                         // Kafka Streams instance may have closed but 
rebalance hasn't happened
                         return false;
@@ -312,7 +309,7 @@ public class QueryableStateIntegrationTest {
                         // rebalance
                         return false;
                     }
-                    return store != null && store.fetch(key, from, to) != null;
+
                 }
             }, 30000, "waiting for metadata, store and value to be non null");
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d74196f/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index d110277..03280a8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -210,6 +210,13 @@ public class StreamsMetadataStateTest {
     }
 
     @Test
+    public void shouldReturnNotAvailableWhenClusterIsEmpty() throws Exception {
+        discovery.onChange(Collections.<HostInfo, 
Set<TopicPartition>>emptyMap(), Cluster.empty());
+        final StreamsMetadata result = 
discovery.getMetadataWithKey("table-one", "a", Serdes.String().serializer());
+        assertEquals(StreamsMetadata.NOT_AVAILABLE, result);
+    }
+
+    @Test
     public void shouldGetInstanceWithKeyWithMergedStreams() throws Exception {
         final TopicPartition topic2P2 = new TopicPartition("topic-two", 2);
         hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, topic1P1, 
topic2P2));

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d74196f/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index cdc5ac7..05c32f0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -18,6 +18,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.test.StateStoreProviderStub;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -42,8 +43,8 @@ public class CompositeReadOnlyKeyValueStoreTest {
     @SuppressWarnings("unchecked")
     @Before
     public void before() {
-        final StateStoreProviderStub stubProviderOne = new 
StateStoreProviderStub();
-        stubProviderTwo = new StateStoreProviderStub();
+        final StateStoreProviderStub stubProviderOne = new 
StateStoreProviderStub(false);
+        stubProviderTwo = new StateStoreProviderStub(false);
 
         stubOneUnderlying = newStoreInstance();
         stubProviderOne.addStore(storeName, stubOneUnderlying);
@@ -148,19 +149,19 @@ public class CompositeReadOnlyKeyValueStoreTest {
     }
 
     @Test(expected = InvalidStateStoreException.class)
-    public void shouldThrowInvalidStoreExceptionIfNoStoresExistOnGet() throws 
Exception {
-        noStores().get("anything");
+    public void shouldThrowInvalidStoreExceptionDuringRebalance() throws 
Exception {
+        rebalancing().get("anything");
     }
 
 
     @Test(expected = InvalidStateStoreException.class)
-    public void shouldThrowInvalidStoreExceptionIfNoStoresExistOnRange() 
throws Exception {
-        noStores().range("anything", "something");
+    public void shouldThrowInvalidStoreExceptionOnRangeDuringRebalance() 
throws Exception {
+        rebalancing().range("anything", "something");
     }
 
     @Test(expected = InvalidStateStoreException.class)
-    public void shouldThrowInvalidStoreExceptionIfNoStoresExistOnAll() throws 
Exception {
-        noStores().all();
+    public void shouldThrowInvalidStoreExceptionOnAllDuringRebalance() throws 
Exception {
+        rebalancing().all();
     }
 
     @Test
@@ -192,8 +193,8 @@ public class CompositeReadOnlyKeyValueStoreTest {
         assertEquals(Long.MAX_VALUE, theStore.approximateNumEntries());
     }
 
-    private CompositeReadOnlyKeyValueStore<Object, Object> noStores() {
-        return new CompositeReadOnlyKeyValueStore<>(new 
WrappingStoreProvider(Collections.<StateStoreProvider>emptyList()),
+    private CompositeReadOnlyKeyValueStore<Object, Object> rebalancing() {
+        return new CompositeReadOnlyKeyValueStore<>(new 
WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(new 
StateStoreProviderStub(true))),
                 QueryableStoreTypes.keyValueStore(), storeName);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d74196f/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index 825c1e8..d098429 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -15,8 +15,10 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.StateStoreProviderStub;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -42,8 +44,8 @@ public class CompositeReadOnlyWindowStoreTest {
 
     @Before
     public void before() {
-        stubProviderOne = new StateStoreProviderStub();
-        stubProviderTwo = new StateStoreProviderStub();
+        stubProviderOne = new StateStoreProviderStub(false);
+        stubProviderTwo = new StateStoreProviderStub(false);
         underlyingWindowStore = new ReadOnlyWindowStoreStub<>();
         stubProviderOne.addStore(storeName, underlyingWindowStore);
 
@@ -103,6 +105,19 @@ public class CompositeReadOnlyWindowStoreTest {
         assertEquals(Collections.singletonList(new KeyValue<>(1L, 
"my-value")), results);
     }
 
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowInvalidStateStoreExceptionOnRebalance() throws 
Exception {
+        final CompositeReadOnlyWindowStore<Object, Object> store = new 
CompositeReadOnlyWindowStore<>(new StateStoreProviderStub(true), 
QueryableStoreTypes.windowStore(), "foo");
+        store.fetch("key", 1, 10);
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowInvalidStateStoreExceptionIfFetchThrows() throws 
Exception {
+        underlyingWindowStore.setOpen(false);
+        underlyingWindowStore.fetch("key", 1, 10);
+    }
+
     static <K, V> List<KeyValue<K, V>> toList(final Iterator<KeyValue<K, V>> 
iterator) {
         final List<KeyValue<K, V>> results = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d74196f/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
index 276930f..3660e8e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.state.NoOpWindowStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.test.StateStoreProviderStub;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -33,7 +34,7 @@ public class QueryableStoreProviderTest {
 
     @Before
     public void before() {
-        final StateStoreProviderStub theStoreProvider = new 
StateStoreProviderStub();
+        final StateStoreProviderStub theStoreProvider = new 
StateStoreProviderStub(false);
         theStoreProvider.addStore(keyValueStore, new 
StateStoreTestUtils.NoOpReadOnlyStore<>());
         theStoreProvider.addStore(windowStore, new NoOpWindowStore());
         storeProvider =

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d74196f/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index 5b88eb8..2082e00 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -15,6 +15,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
@@ -32,9 +33,13 @@ import java.util.Map;
 public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, 
V>, StateStore {
 
     private final Map<Long, Map<K, V>> data = new HashMap<>();
+    private boolean open  = true;
 
     @Override
     public WindowStoreIterator<V> fetch(final K key, final long timeFrom, 
final long timeTo) {
+        if (!open) {
+            throw new InvalidStateStoreException("Store is not open");
+        }
         final List<KeyValue<Long, V>> results = new ArrayList<>();
         for (long now = timeFrom; now <= timeTo; now++) {
             final Map<K, V> kvMap = data.get(now);
@@ -79,7 +84,11 @@ public class ReadOnlyWindowStoreStub<K, V> implements 
ReadOnlyWindowStore<K, V>,
 
     @Override
     public boolean isOpen() {
-        return false;
+        return open;
+    }
+
+    public void setOpen(final boolean open) {
+        this.open = open;
     }
 
     private class TheWindowStoreIterator<E> implements WindowStoreIterator<E> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d74196f/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreProviderStub.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreProviderStub.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreProviderStub.java
deleted file mode 100644
index 3712990..0000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreProviderStub.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License.  You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.state.QueryableStoreType;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class StateStoreProviderStub implements StateStoreProvider {
-
-    private final Map<String, StateStore> stores = new HashMap<>();
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public <T> List<T> stores(final String storeName, final 
QueryableStoreType<T> queryableStoreType) {
-        if (stores.containsKey(storeName) && 
queryableStoreType.accepts(stores.get(storeName))) {
-            return (List<T>) Collections.singletonList(stores.get(storeName));
-        }
-        return Collections.emptyList();
-    }
-
-    public void addStore(final String storeName,
-                         final StateStore store) {
-        stores.put(storeName, store);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d74196f/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
index 710557e..708e153 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.state.NoOpWindowStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.test.StateStoreProviderStub;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -37,8 +38,8 @@ public class WrappingStoreProviderTest {
 
     @Before
     public void before() {
-        final StateStoreProviderStub stubProviderOne = new 
StateStoreProviderStub();
-        final StateStoreProviderStub stubProviderTwo = new 
StateStoreProviderStub();
+        final StateStoreProviderStub stubProviderOne = new 
StateStoreProviderStub(false);
+        final StateStoreProviderStub stubProviderTwo = new 
StateStoreProviderStub(false);
 
 
         stubProviderOne.addStore("kv", 
StateStoreTestUtils.newKeyValueStore("kv", String.class, String.class));

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d74196f/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java 
b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java
new file mode 100644
index 0000000..f17777f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.internals.StateStoreProvider;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class StateStoreProviderStub implements StateStoreProvider {
+
+    private final Map<String, StateStore> stores = new HashMap<>();
+    private final boolean throwException;
+
+    public StateStoreProviderStub(final boolean throwException) {
+
+        this.throwException = throwException;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> List<T> stores(final String storeName, final 
QueryableStoreType<T> queryableStoreType) {
+        if (throwException) {
+            throw new InvalidStateStoreException("store is unavailable");
+        }
+        if (stores.containsKey(storeName) && 
queryableStoreType.accepts(stores.get(storeName))) {
+            return (List<T>) Collections.singletonList(stores.get(storeName));
+        }
+        return Collections.emptyList();
+    }
+
+    public void addStore(final String storeName,
+                         final StateStore store) {
+        stores.put(storeName, store);
+    }
+
+}

Reply via email to