Guozhang Wang created KAFKA-4601:
------------------------------------
Summary: Avoid duplicated repartitioning in KStream DSL
Key: KAFKA-4601
URL: https://issues.apache.org/jira/browse/KAFKA-4601
Project: Kafka
Issue Type: Bug
Components: streams
Reporter: Guozhang Wang
Consider the following DSL:
{code}
Stream<String, String> source = builder.stream(Serdes.String(),
Serdes.String(), "topic1").map(..);
KTable<String, Long> counts = source
.groupByKey()
.count("Counts");
KStream<String, String> sink = source.leftJoin(counts, ..);
{code}
The resulted topology looks like this:
{code}
ProcessorTopology:
KSTREAM-SOURCE-0000000000:
topics: [topic1]
children: [KSTREAM-MAP-0000000001]
KSTREAM-MAP-0000000001:
children:
[KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000007]
KSTREAM-FILTER-0000000004:
children:
[KSTREAM-SINK-0000000003]
KSTREAM-SINK-0000000003:
topic: X-Counts-repartition
KSTREAM-FILTER-0000000007:
children:
[KSTREAM-SINK-0000000006]
KSTREAM-SINK-0000000006:
topic:
X-KSTREAM-MAP-0000000001-repartition
ProcessorTopology:
KSTREAM-SOURCE-0000000008:
topics:
[X-KSTREAM-MAP-0000000001-repartition]
children:
[KSTREAM-LEFTJOIN-0000000009]
KSTREAM-LEFTJOIN-0000000009:
states: [Counts]
KSTREAM-SOURCE-0000000005:
topics: [X-Counts-repartition]
children:
[KSTREAM-AGGREGATE-0000000002]
KSTREAM-AGGREGATE-0000000002:
states: [Counts]
{code}
I.e. there are two repartition topics, one for the aggregate and one for the
join, which not only introduce unnecessary overheads but also mess up the
processing ordering (users are expecting each record to go through aggregation
first then the join operator). And in order to get the following simpler
topology users today need to add a {{through}} operator after {{map}} manually
to enforce repartitioning.
{code}
ProcessorTopology:
KSTREAM-SOURCE-0000000000:
topics: [topic1]
children: [KSTREAM-MAP-0000000001]
KSTREAM-MAP-0000000001:
children:
[KSTREAM-SINK-0000000002]
KSTREAM-SINK-0000000002:
topic: topic 2
ProcessorTopology:
KSTREAM-SOURCE-0000000003:
topics: [topic 2]
children:
[KSTREAM-AGGREGATE-0000000004, KSTREAM-LEFTJOIN-0000000005]
KSTREAM-AGGREGATE-0000000004:
states: [Counts]
KSTREAM-LEFTJOIN-0000000005:
states: [Counts]
{code}
This kind of optimization should be automatic in Streams, which we can consider
doing when extending from one-operator-at-a-time translation.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)