[ https://issues.apache.org/jira/browse/SAMZA-1018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15497455#comment-15497455 ]
Yi Pan (Data Infrastructure) commented on SAMZA-1018: ----------------------------------------------------- OK. I missed the email discussion. Posting it here for the record: Hey Navina, This was consistently reproducible both locally and in our integration test environment. We have auto.create.topics.enable on our brokers (or more accurately, we do not have it disabled; it's the default). I did not mean to imply there is a problem with the logic of the change in SAMZA-971; I understand the desire to make fewer calls, but at the time I did not have time to dig in and see exactly what the root cause of the difference was. I think I've found it now though. Prior to the 971 fix, we eventually wind up in KafkaSystemAdmin.getTopicsAndPartitionsByBroker(), which contains this code: KafkaUtil.maybeThrowException(topicMetadata.errorCode) What I found was that this was indeed throwing a LeaderNotAvailableException in the case where the topic did not already exist. This has the effect of triggering a retry in KafkaSystemAdmin.getSystemStreamMetadata(), and this continues until the broker has finished creating the topic and returns the correct partition metadata. The optimized path introduced by the SAMZA-971 fix goes into KafkaSystemAdmin.getSystemStreamPartitionCounts() which does not check this errorCode, and simply returns an empty set of partitions. Does that make sense? -Tommy > Cannot consume non-existent topic > --------------------------------- > > Key: SAMZA-1018 > URL: https://issues.apache.org/jira/browse/SAMZA-1018 > Project: Samza > Issue Type: Bug > Components: container > Affects Versions: 0.10.1 > Environment: Kafka 0.10.0.0 > Reporter: Tommy Becker > > When starting a stream job that consumes a topic that does not yet exist, the > job dies with the following exception: > {noformat} > Exception in thread "main" java.lang.IllegalArgumentException: No tasks > found. Likely due to no input partitions. Can't run a job with no tasks. > at > org.apache.samza.container.grouper.task.GroupByContainerCount.validateTasks(GroupByContainerCount.java:193) > at > org.apache.samza.container.grouper.task.GroupByContainerCount.balance(GroupByContainerCount.java:86) > at > org.apache.samza.coordinator.JobModelManager$.refreshJobModel(JobCoordinator.scala:278) > at > org.apache.samza.coordinator.JobModelManager$.jobModelGenerator$1(JobCoordinator.scala:211) > at > org.apache.samza.coordinator.JobModelManager$.initializeJobModel(JobCoordinator.scala:217) > at > org.apache.samza.coordinator.JobModelManager$.getJobCoordinator(JobCoordinator.scala:122) > at > org.apache.samza.coordinator.JobModelManager$.apply(JobCoordinator.scala:106) > at > org.apache.samza.coordinator.JobModelManager$.apply(JobCoordinator.scala:112) > at > org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:40) > at org.apache.samza.job.JobRunner.run(JobRunner.scala:129) > at org.apache.samza.job.JobRunner$.main(JobRunner.scala:66) > at org.apache.samza.job.JobRunner.main(JobRunner.scala) > {noformat} > This seems to be caused by the fix for SAMZA-971, specifically passing > partitionsMetadataOnly = true to the StreamMetadataCache in > JobModelManager#getInputStreamPartitions > https://github.com/apache/samza/commit/920f803a2e3dab809f4d7bb518259b0f4164407f > Note the input topic is subsequently created, so a restart of the job will > likely succeed. Being able to consume topics which have not yet been created > is nice to avoid imposing a startup order between the jobs and the processes > which produce to the topics. Setting partitionsMetadataOnly to back to false > fixed the issue for us, but I'm not sure if this is the best fix. For the > record, we are using Kafka 0.10.0.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332)