Yi Pan (Data Infrastructure) commented on SAMZA-1018:

OK. I missed the email discussion. Posting it here for the record:

Hey Navina,

This was consistently reproducible both locally and in our integration test 
environment. We have auto.create.topics.enable on our brokers (or more 
accurately, we do not have it disabled; it's the default). I did not mean to 
imply there is a problem with the logic of the change in SAMZA-971; I 
understand the desire to make fewer calls, but at the time I did not have time 
to dig in and see exactly what the root cause of the difference was. I think 
I've found it now though.

Prior to the 971 fix, we eventually wind up in 
KafkaSystemAdmin.getTopicsAndPartitionsByBroker(), which contains this code:


What I found was that this was indeed throwing a LeaderNotAvailableException in 
the case where the topic did not already exist. This has the effect of 
triggering a retry in KafkaSystemAdmin.getSystemStreamMetadata(), and this 
continues until the broker has finished creating the topic and returns the 
correct partition metadata. The optimized path introduced by the SAMZA-971 fix 
goes into KafkaSystemAdmin.getSystemStreamPartitionCounts() which does not 
check this errorCode, and simply returns an empty set of partitions. Does that 
make sense?


> Cannot consume non-existent topic
> ---------------------------------
>                 Key: SAMZA-1018
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1018
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.10.1
>         Environment: Kafka
>            Reporter: Tommy Becker
> When starting a stream job that consumes a topic that does not yet exist, the 
> job dies with the following exception: 
> {noformat}
> Exception in thread "main" java.lang.IllegalArgumentException: No tasks 
> found. Likely due to no input partitions. Can't run a job with no tasks.
>        at 
> org.apache.samza.container.grouper.task.GroupByContainerCount.validateTasks(GroupByContainerCount.java:193)
>        at 
> org.apache.samza.container.grouper.task.GroupByContainerCount.balance(GroupByContainerCount.java:86)
>        at 
> org.apache.samza.coordinator.JobModelManager$.refreshJobModel(JobCoordinator.scala:278)
>        at 
> org.apache.samza.coordinator.JobModelManager$.jobModelGenerator$1(JobCoordinator.scala:211)
>        at 
> org.apache.samza.coordinator.JobModelManager$.initializeJobModel(JobCoordinator.scala:217)
>        at 
> org.apache.samza.coordinator.JobModelManager$.getJobCoordinator(JobCoordinator.scala:122)
>        at 
> org.apache.samza.coordinator.JobModelManager$.apply(JobCoordinator.scala:106)
>        at 
> org.apache.samza.coordinator.JobModelManager$.apply(JobCoordinator.scala:112)
>        at 
> org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:40)
>        at org.apache.samza.job.JobRunner.run(JobRunner.scala:129)
>        at org.apache.samza.job.JobRunner$.main(JobRunner.scala:66)
>        at org.apache.samza.job.JobRunner.main(JobRunner.scala) 
> {noformat}
> This seems to be caused by the fix for SAMZA-971, specifically passing 
> partitionsMetadataOnly = true to the StreamMetadataCache in 
> JobModelManager#getInputStreamPartitions 
> https://github.com/apache/samza/commit/920f803a2e3dab809f4d7bb518259b0f4164407f
> Note the input topic is subsequently created, so a restart of the job will 
> likely succeed. Being able to consume topics which have not yet been created 
> is nice to avoid imposing a startup order between the jobs and the processes 
> which produce to the topics. Setting partitionsMetadataOnly to back to false 
> fixed the issue for us, but I'm not sure if this is the best fix. For the 
> record, we are using Kafka

This message was sent by Atlassian JIRA

Reply via email to