This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 34a1f70 KAFKA-6826 avoid range scans when forwarding values during
aggregation (#4927)
34a1f70 is described below
commit 34a1f7099b65f43037602eb13cbffd7df276e290
Author: Xavier Léauté <[email protected]>
AuthorDate: Wed Apr 25 08:36:14 2018 -0700
KAFKA-6826 avoid range scans when forwarding values during aggregation
(#4927)
Reviewers: Matthias J Sax <matthias@confluentio>, Bill Bejeck
<[email protected]>, John Roesler <[email protected]>, Guozhang Wang
<[email protected]>
---
.../kafka/streams/state/internals/CachingWindowStore.java | 10 ++++------
1 file changed, 4 insertions(+), 6 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 9ef41ce..58111a6 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -219,13 +219,11 @@ class CachingWindowStore<K, V> extends
WrappedStateStore.AbstractStateStore impl
}
private V fetchPrevious(final Bytes key, final long timestamp) {
- try (final WindowStoreIterator<byte[]> iter = underlying.fetch(key,
timestamp, timestamp)) {
- if (!iter.hasNext()) {
- return null;
- } else {
- return serdes.valueFrom(iter.next().value);
- }
+ final byte[] value = underlying.fetch(key, timestamp);
+ if (value != null) {
+ return serdes.valueFrom(value);
}
+ return null;
}
@Override
--
To stop receiving notification emails like this one, please contact
[email protected].