Guozhang Wang created KAFKA-3902:
------------------------------------
Summary: Optimize KTable.filter() to reduce unnecessary traffic
Key: KAFKA-3902
URL: https://issues.apache.org/jira/browse/KAFKA-3902
Project: Kafka
Issue Type: Bug
Components: streams
Reporter: Guozhang Wang
{{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be
optimized to reduce unnecessary data traffic to downstream operators. More
specifically:
1. Some context: when a KTable participates in a downstream operators (e.g. if
that operator is an aggregation), then we need to materialize this KTable and
send both its old value as well as new value as a pair {old -> new} to the
downstream operator. In practice it usually needs to send the pair.
So let's discuss about them separately, take the following example source
stream for your KTable
{{<a: 1>, <b: 2>, <a: 3> ...}}
When the KTable needs to be materialized, it will transform the source messages
into the pairs of:
{{<a: {null -> 1}>, <b: {nul -> 2}>, <a: {1 -> 3}>}}
2. If "send old value" is not enabled, then when the filter predicate returns
false, we MUST send a <key: null> to the downstream operator to indicate that
this key is being filtered in the table. Otherwise, for example if your filter
is "value < 2", then the updated value <a: 3> will just be filtered, resulting
in incorrect semantics.
If it returns true we should still send the original <key: value> to downstream
operators.
3. If "send old value" is enabled, then there are a couple of cases we can
consider:
a. If old value is <key: null> and new value is <key: not-null>, and the
filter predicate return false for the new value, then in this case it is safe
to optimize and not returning anything to the downstream operator, since in
this case we know there is no value for the key previously anyways; otherwise
we send the original pair.
b. If old value is <key: not-null> and new value is <key: null>, indicating
to delete this key, and the filter predicate return false for the old value,
then in this case it is safe to optimize and not returning anything to the
downstream operator, since we know that the old value has already been filtered
in a previous message; otherwise we send the original pair.
c. If both old and new values are not null, and:
1) predicate return true on both, send the original pair;
2) predicate return false on both, we can optimize and do not send
anything;
3) predicate return true on old and false on new, send the key: {old ->
null};
4) predicate return false on old and true on new, send the key: {null
-> new};
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)