[
https://issues.apache.org/jira/browse/SAMZA-754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009562#comment-15009562
]
Yi Pan (Data Infrastructure) commented on SAMZA-754:
----------------------------------------------------
Turns out that this is related to the race condition between the
CoordinatorStreamSystemProducer and CoordinatorStreamSystemConsumer in
JobRunner code:
{code}
if (resetJobConfig) {
info("Storing config in coordinator stream.")
coordinatorSystemProducer.register(JobRunner.SOURCE)
coordinatorSystemProducer.start
coordinatorSystemProducer.writeConfig(JobRunner.SOURCE, config)
}
info("Loading old config from coordinator stream.")
coordinatorSystemConsumer.register
coordinatorSystemConsumer.start
coordinatorSystemConsumer.bootstrap
coordinatorSystemConsumer.stop
{code}
There is a chance that if the topic is empty, and coordinator system producer
wrote the config to the Kafka broker as the first message. When the coordinator
system consumer immediately does a fetchOffset() call on the topic, the broker
may return "null" as the starting offset due to that the first message has not
been completely committed yet. In this case, it causes the NPE in
CoordinatorSystemConsumer.register(). The right fix would need to treat this
case as if the starting offset is "oldest" case, instead of null.
> Race condition between coordinator stream producer and coordinator stream
> consumer in JobRunner
> -----------------------------------------------------------------------------------------------
>
> Key: SAMZA-754
> URL: https://issues.apache.org/jira/browse/SAMZA-754
> Project: Samza
> Issue Type: Bug
> Affects Versions: 0.10.0
> Reporter: Yi Pan (Data Infrastructure)
> Fix For: 0.10.0
>
>
> Removing the checkpoint related configuration in samza-test/.../integration
> tests seems to have a negative impact:
> It results in integration test failure when there are multiple tests running
> in a row. It might be related to coordinator stream migration.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)