[
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346626#comment-16346626
]
ASF GitHub Bot commented on KAFKA-6378:
---------------------------------------
andybryant opened a new pull request #4494: KAFKA-6378 KStream-GlobalKTable
null KeyValueMapper handling
URL: https://github.com/apache/kafka/pull/4494
For KStream-GlobalKTable joins let `null` `KeyValueMapper` results indicate
no match
For KStream-GlobalKTable joins, a `KeyValueMapper` is used to derive a key
from the stream records into the `GlobalKTable`. For some stream values there
may be no valid reference to the table stream. This patch allows developers to
use `null` return values to indicate there is no possible match. This is
possible in this case since `null` is never a valid key value for a
`GlobalKTable`.
Without this patch, providing a `null` value caused the stream to crash on
Kafka 1.0.
I added unit tests for KStream-GlobalKTable left and inner joins, since they
were missing. I also covered this additional scenario where `KeyValueMapper`
returns `null` to insure it is handled correctly.
### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper
> returns null
> --------------------------------------------------------------------------------------
>
> Key: KAFKA-6378
> URL: https://issues.apache.org/jira/browse/KAFKA-6378
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.0.0
> Reporter: Andy Bryant
> Priority: Major
> Fix For: 1.0.1
>
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the
> stream fails with a NullPointerException (see stacktrace below). On Kafka
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference
> data where the stream foreign key may be null. There is no straight-forward
> workaround in this case with Kafka 1.0.0 without having to resort to either
> generating a key that will never match or branching the stream for records
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1"
> java.lang.NullPointerException
> at java.base/java.util.Objects.requireNonNull(Objects.java:221)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
> at
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
> at
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
> at
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)