Hi again (again), Ties,
Sorry for the confusion, but I was talking to someone else about this,
and I started to make a ticket to fix it, and realized once I started
looking into it that there is actually no repartition topic for a
stream-globalTable join.
So, if you do something like:
=====
public static void main(String[] args) {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<Object, Object> left =
streamsBuilder.stream("left").selectKey((ok, ov) -> "newK"+ok);
final GlobalKTable<Object, Object> right =
streamsBuilder.globalTable("right");
final KStream<Object, KeyValue<Object, Object>> join =
left.join(right, (ok, ov) -> ok, KeyValue::new);
join.to("out");
final Topology build = streamsBuilder.build();
System.out.println(build.describe());
}
====
(namely, the selectKey on the stream)
Then, you should get the result you expect.
Sorry again for my multiple replies.
-John
On Mon, Jul 15, 2019 at 11:35 AM John Roesler <[email protected]> wrote:
>
> Hi again, Ties,
>
> I think I spoke too soon and also misread your email.
>
> By any chance, are you doing a join of a KStream and a GlobalKTable?
>
> In this case, it would make perfect sense to do what you're doing, but
> unfortunately the current implementation doesn't support it.
>
> Your workaround would be to use KStream.selectKey on the left side to
> pick a key before the join. Unfortunately, this will create a
> repartition topic that is unnecessary when you're joining with a
> GlobalKTable.
>
> On the other hand, you could at that point switch to a regular
> KStream/KTable join and reduce the memory/storage requirements, as
> each node won't have to host the whole global data set anymore.
>
> Please feel free to share your code in some form to clear up the
> situation in case I got it wrong again.
>
> Thanks,
> -John
>
> On Mon, Jul 15, 2019 at 10:48 AM John Roesler <[email protected]> wrote:
> >
> > Hi Ties,
> >
> > You're on the right track. You need to use `KTable.map` ahead of the
> > join to select the new key. This will allow Streams to make sure the
> > data is correctly partitioned to perform the join.
> >
> > Thanks,
> > -John
> >
> > On Mon, Jul 15, 2019 at 10:07 AM Ven, Ties Jens van de
> > <[email protected]> wrote:
> > >
> > > I recently started working with kafka streams and I noticed some odd
> > > behavior.
> > >
> > > I was using a KTable left join with a null key, and ofcourse this will
> > > not work, since it will join based on keys.
> > > But I also supplied a KeyValueMapper, which takes a property from the
> > > value and returns this as key, and uses this value to join.
> > >
> > > It turns out that in the code, it firsts checks if there is a null key,
> > > and if so, it skips.
> > > Would it be more logical to check the result of the keyMapper for null
> > > instead of the actual key?
> > >
> > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
> > >
> > > Kind regards
> > >
> > > Ties