[
https://issues.apache.org/jira/browse/SAMZA-289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Riccomini resolved SAMZA-289.
-----------------------------------
Resolution: Fixed
Merged and committed to both 0.7.0 branch and master.
> Job fails with invalid checkpoint topic partition count
> -------------------------------------------------------
>
> Key: SAMZA-289
> URL: https://issues.apache.org/jira/browse/SAMZA-289
> Project: Samza
> Issue Type: Bug
> Components: kafka
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Chris Riccomini
> Priority: Blocker
> Fix For: 0.7.0
>
> Attachments: SAMZA-289.0.patch
>
>
> We've been seeing failures off and on lately with messages like this:
> {noformat}
> org.apache.samza.checkpoint.kafka.KafkaCheckpointException: Checkpoint topic
> validation failed for topic
> __samza_checkpoint_samza-perf-store-all-calls_i001 because partition count 8
> did not match expected partition count 64.
> {noformat}
> This causes the entire job to fail. It is triggered the first time a Samza
> job is run in a cluster, and only on jobs with an input stream that has more
> than the default partition count (num.partitions in the Kafka broker).
> I believe there is a race condition in KafkaCheckpointManager. Right now we
> only run KafkaCheckpointManager.createTopic in the container that owns the
> first partition:
> {code}
> def start {
> if (partitions.contains(new Partition(0))) {
> createTopic
> }
> validateTopic
> }
> {code}
> If a container starts before the container with partition 0, then the
> container without partition 0 will just run validateTopic. This triggers a
> call to get TopicMetadata from Kafka. If the checkpoint topic doesn't exist,
> I believe the broker will say so, but it will also create the topic in the
> background. When it does this, num.partitions (the default partition count)
> will be used to define how many partitions the new checkpoint topic has.
> If a Samza job's task.input stream list contains a stream with a non-default
> number of partitions (e.g. num.partitions=8, but task.inputs has a stream
> with 16 partitions), then this race condition can trigger a checkpoint topic
> with 8 partitions, and the validation will fail.
> I think the simplest fix is just to strip the if statement from
> KafkaCheckpointManager.start, and have all containers try and create the
> checkpoint topic. This will eliminate the race condition, since all
> containers will try and create the checkpoint topic with the correct number
> of partitions.
--
This message was sent by Atlassian JIRA
(v6.2#6252)