[
https://issues.apache.org/jira/browse/KAFKA-6451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16344369#comment-16344369
]
ASF GitHub Bot commented on KAFKA-6451:
---------------------------------------
guozhangwang closed pull request #4477: KAFKA-6451: Simplifying KStreamReduce
and KStreamAggregate
URL: https://github.com/apache/kafka/pull/4477
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index b1abdc29de0..95ad78e0428 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -64,20 +64,21 @@ public void init(ProcessorContext context) {
@Override
public void process(K key, V value) {
- if (key == null)
+ // If the key or value is null we don't need to proceed
+ if (key == null || value == null) {
return;
+ }
T oldAgg = store.get(key);
- if (oldAgg == null)
+ if (oldAgg == null) {
oldAgg = initializer.apply();
+ }
T newAgg = oldAgg;
// try to add the new value
- if (value != null) {
- newAgg = aggregator.apply(key, value, newAgg);
- }
+ newAgg = aggregator.apply(key, value, newAgg);
// update the store with the new value
store.put(key, newAgg);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index d339624c8a0..0fd8f757cfd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -61,21 +61,21 @@ public void init(ProcessorContext context) {
@Override
public void process(K key, V value) {
- // If the key is null we don't need to proceed
- if (key == null)
+ // If the key or value is null we don't need to proceed
+ if (key == null || value == null) {
return;
+ }
V oldAgg = store.get(key);
V newAgg = oldAgg;
// try to add the new value
- if (value != null) {
- if (newAgg == null) {
- newAgg = value;
- } else {
- newAgg = reducer.apply(newAgg, value);
- }
+ if (newAgg == null) {
+ newAgg = value;
+ } else {
+ newAgg = reducer.apply(newAgg, value);
}
+
// update the store with the new value
store.put(key, newAgg);
tupleForwarder.maybeForward(key, newAgg, oldAgg);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index c8b7c1856b7..9d8b4797efb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -621,6 +621,7 @@ private void processData() {
driver.process(TOPIC, "1", "D");
driver.process(TOPIC, "3", "E");
driver.process(TOPIC, "3", "F");
+ driver.process(TOPIC, "3", null);
driver.flushState();
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Simplify KStreamReduce
> ----------------------
>
> Key: KAFKA-6451
> URL: https://issues.apache.org/jira/browse/KAFKA-6451
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.0.0
> Reporter: Matthias J. Sax
> Assignee: Tanvi Jaywant
> Priority: Minor
> Labels: beginner, newbie
> Fix For: 1.1.0
>
>
> If we do aggregations, we drop records with {{key=null}} or {{value=null}}.
> However, in {{KStreamReduce}} we only early exit if {{key=null}} and process
> {{value=null}} – even if we only update the state with it's old value and
> also only send the old value downstream (ie, we still compute the correct
> result), it's undesired and wasteful and we should early exit on
> {{value=null}}, too.
> This problem might occur for {{KStreamAggregate}} or other processors, too,
> and we need to double check those to make sure we implement consistent
> behavior.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)