This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 05e42e1 KAFKA-6451: Simplifying KStreamReduce and KStreamAggregate
05e42e1 is described below
commit 05e42e1df12d87a29c8127dec23d7a9c290901d6
Author: Tanvi Jaywant <[email protected]>
AuthorDate: Mon Jan 29 17:28:49 2018 -0800
KAFKA-6451: Simplifying KStreamReduce and KStreamAggregate
[KAFKA-6451](https://issues.apache.org/jira/browse/KAFKA-6451)
Simplified KStreamReduce and KStreamAggregate.
Updated comments in KStreamAggregate.
Author: Tanvi Jaywant <[email protected]>
Reviewers: Guozhang Wang <[email protected]>, Matthias J. Sax
<[email protected]>
Closes #4477 from tanvijaywant31/KAFKA-6451
---
.../streams/kstream/internals/KStreamAggregate.java | 11 ++++++-----
.../kafka/streams/kstream/internals/KStreamReduce.java | 16 ++++++++--------
.../kstream/internals/KGroupedStreamImplTest.java | 1 +
3 files changed, 15 insertions(+), 13 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index b1abdc2..95ad78e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -64,20 +64,21 @@ public class KStreamAggregate<K, V, T> implements
KStreamAggProcessorSupplier<K,
@Override
public void process(K key, V value) {
- if (key == null)
+ // If the key or value is null we don't need to proceed
+ if (key == null || value == null) {
return;
+ }
T oldAgg = store.get(key);
- if (oldAgg == null)
+ if (oldAgg == null) {
oldAgg = initializer.apply();
+ }
T newAgg = oldAgg;
// try to add the new value
- if (value != null) {
- newAgg = aggregator.apply(key, value, newAgg);
- }
+ newAgg = aggregator.apply(key, value, newAgg);
// update the store with the new value
store.put(key, newAgg);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index d339624..0fd8f75 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -61,21 +61,21 @@ public class KStreamReduce<K, V> implements
KStreamAggProcessorSupplier<K, K, V,
@Override
public void process(K key, V value) {
- // If the key is null we don't need to proceed
- if (key == null)
+ // If the key or value is null we don't need to proceed
+ if (key == null || value == null) {
return;
+ }
V oldAgg = store.get(key);
V newAgg = oldAgg;
// try to add the new value
- if (value != null) {
- if (newAgg == null) {
- newAgg = value;
- } else {
- newAgg = reducer.apply(newAgg, value);
- }
+ if (newAgg == null) {
+ newAgg = value;
+ } else {
+ newAgg = reducer.apply(newAgg, value);
}
+
// update the store with the new value
store.put(key, newAgg);
tupleForwarder.maybeForward(key, newAgg, oldAgg);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index c8b7c18..9d8b479 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -621,6 +621,7 @@ public class KGroupedStreamImplTest {
driver.process(TOPIC, "1", "D");
driver.process(TOPIC, "3", "E");
driver.process(TOPIC, "3", "F");
+ driver.process(TOPIC, "3", null);
driver.flushState();
}
--
To stop receiving notification emails like this one, please contact
[email protected].