This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e32dcb9 KAFKA-6878: NPE when querying global state store not in READY
state (#4978)
e32dcb9 is described below
commit e32dcb9a669bc354cc97cb45f14f0dbad9657693
Author: tedyu <[email protected]>
AuthorDate: Wed May 9 10:42:10 2018 -0700
KAFKA-6878: NPE when querying global state store not in READY state (#4978)
Check whether cache is null before retrieving from cache.
Reviewers: Guozhang Wang <[email protected]>, Bill Bejeck
<[email protected]>
---
.../apache/kafka/streams/state/internals/CachingKeyValueStore.java | 5 ++++-
.../org/apache/kafka/streams/state/internals/CachingWindowStore.java | 3 +++
2 files changed, 7 insertions(+), 1 deletion(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 45f606f..16684e3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -163,7 +163,10 @@ class CachingKeyValueStore<K, V> extends
WrappedStateStore.AbstractStateStore im
}
private byte[] getInternal(final Bytes key) {
- final LRUCacheEntry entry = cache.get(cacheName, key);
+ LRUCacheEntry entry = null;
+ if (cache != null) {
+ entry = cache.get(cacheName, key);
+ }
if (entry == null) {
final byte[] rawValue = underlying.get(key);
if (rawValue == null) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 58111a6..1d0455b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -160,6 +160,9 @@ class CachingWindowStore<K, V> extends
WrappedStateStore.AbstractStateStore impl
validateStoreOpen();
final Bytes bytesKey = WindowKeySchema.toStoreKeyBinary(key,
timestamp, 0);
final Bytes cacheKey = cacheFunction.cacheKey(bytesKey);
+ if (cache == null) {
+ return underlying.fetch(key, timestamp);
+ }
final LRUCacheEntry entry = cache.get(name, cacheKey);
if (entry == null) {
return underlying.fetch(key, timestamp);
--
To stop receiving notification emails like this one, please contact
[email protected].