[
https://issues.apache.org/jira/browse/SAMZA-1018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15497455#comment-15497455
]
Yi Pan (Data Infrastructure) commented on SAMZA-1018:
-----------------------------------------------------
OK. I missed the email discussion. Posting it here for the record:
Hey Navina,
This was consistently reproducible both locally and in our integration test
environment. We have auto.create.topics.enable on our brokers (or more
accurately, we do not have it disabled; it's the default). I did not mean to
imply there is a problem with the logic of the change in SAMZA-971; I
understand the desire to make fewer calls, but at the time I did not have time
to dig in and see exactly what the root cause of the difference was. I think
I've found it now though.
Prior to the 971 fix, we eventually wind up in
KafkaSystemAdmin.getTopicsAndPartitionsByBroker(), which contains this code:
KafkaUtil.maybeThrowException(topicMetadata.errorCode)
What I found was that this was indeed throwing a LeaderNotAvailableException in
the case where the topic did not already exist. This has the effect of
triggering a retry in KafkaSystemAdmin.getSystemStreamMetadata(), and this
continues until the broker has finished creating the topic and returns the
correct partition metadata. The optimized path introduced by the SAMZA-971 fix
goes into KafkaSystemAdmin.getSystemStreamPartitionCounts() which does not
check this errorCode, and simply returns an empty set of partitions. Does that
make sense?
-Tommy
> Cannot consume non-existent topic
> ---------------------------------
>
> Key: SAMZA-1018
> URL: https://issues.apache.org/jira/browse/SAMZA-1018
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.10.1
> Environment: Kafka 0.10.0.0
> Reporter: Tommy Becker
>
> When starting a stream job that consumes a topic that does not yet exist, the
> job dies with the following exception:
> {noformat}
> Exception in thread "main" java.lang.IllegalArgumentException: No tasks
> found. Likely due to no input partitions. Can't run a job with no tasks.
> at
> org.apache.samza.container.grouper.task.GroupByContainerCount.validateTasks(GroupByContainerCount.java:193)
> at
> org.apache.samza.container.grouper.task.GroupByContainerCount.balance(GroupByContainerCount.java:86)
> at
> org.apache.samza.coordinator.JobModelManager$.refreshJobModel(JobCoordinator.scala:278)
> at
> org.apache.samza.coordinator.JobModelManager$.jobModelGenerator$1(JobCoordinator.scala:211)
> at
> org.apache.samza.coordinator.JobModelManager$.initializeJobModel(JobCoordinator.scala:217)
> at
> org.apache.samza.coordinator.JobModelManager$.getJobCoordinator(JobCoordinator.scala:122)
> at
> org.apache.samza.coordinator.JobModelManager$.apply(JobCoordinator.scala:106)
> at
> org.apache.samza.coordinator.JobModelManager$.apply(JobCoordinator.scala:112)
> at
> org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:40)
> at org.apache.samza.job.JobRunner.run(JobRunner.scala:129)
> at org.apache.samza.job.JobRunner$.main(JobRunner.scala:66)
> at org.apache.samza.job.JobRunner.main(JobRunner.scala)
> {noformat}
> This seems to be caused by the fix for SAMZA-971, specifically passing
> partitionsMetadataOnly = true to the StreamMetadataCache in
> JobModelManager#getInputStreamPartitions
> https://github.com/apache/samza/commit/920f803a2e3dab809f4d7bb518259b0f4164407f
> Note the input topic is subsequently created, so a restart of the job will
> likely succeed. Being able to consume topics which have not yet been created
> is nice to avoid imposing a startup order between the jobs and the processes
> which produce to the topics. Setting partitionsMetadataOnly to back to false
> fixed the issue for us, but I'm not sure if this is the best fix. For the
> record, we are using Kafka 0.10.0.0
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)