[ 
https://issues.apache.org/jira/browse/SAMZA-151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jakob Homan reassigned SAMZA-151:
---------------------------------

    Assignee: Yan Fang

> Fail early when a consumer is misconfigured
> -------------------------------------------
>
>                 Key: SAMZA-151
>                 URL: https://issues.apache.org/jira/browse/SAMZA-151
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Yan Fang
>              Labels: newbie
>         Attachments: SAMZA-151.patch
>
>
> Currently, if a SystemFactory fails to create a SystemConsumer and throws an 
> exception, we log this error with:
> {code}
>             info("Failed to create a consumer for %s, so skipping." format 
> systemName)
> {code}
> SamzaContainer then continues on. If the system with the failed consumer is 
> configured to be used for an input stream or changelog, the lack of 
> SystemConsumer for the system will ABSOLUTELY result an exception later.
> For example, if you have a system called "kafka", and task.inputs is set to 
> "kafka.my-topic", but the KafkaSystemFactory fails to return a consumer, we 
> will log the error and continue. The resulting exception that's thrown much 
> later is:
> {noformat}
> java.util.NoSuchElementException: key not found: kafka
>         at scala.collection.MapLike$class.default(MapLike.scala:223)
>         at scala.collection.immutable.Map$EmptyMap$.default(Map.scala:73)
>         at scala.collection.MapLike$class.apply(MapLike.scala:134)
>         at scala.collection.immutable.Map$EmptyMap$.apply(Map.scala:73)
>         at 
> org.apache.samza.system.SystemConsumers.register(SystemConsumers.scala:175)
>         at 
> org.apache.samza.container.TaskInstance$$anonfun$registerConsumers$6.apply(TaskInstance.scala:156)
>         at 
> org.apache.samza.container.TaskInstance$$anonfun$registerConsumers$6.apply(TaskInstance.scala:155)
>         at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
>         at 
> org.apache.samza.container.TaskInstance.registerConsumers(TaskInstance.scala:155)
>         at 
> org.apache.samza.container.SamzaContainer$$anonfun$startConsumers$2.apply(SamzaContainer.scala:544)
>         at 
> org.apache.samza.container.SamzaContainer$$anonfun$startConsumers$2.apply(SamzaContainer.scala:544)
>         at 
> scala.collection.MapLike$DefaultValuesIterable$$anonfun$foreach$4.apply(MapLike.scala:201)
>         at 
> scala.collection.MapLike$DefaultValuesIterable$$anonfun$foreach$4.apply(MapLike.scala:201)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
>         at 
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:201)
>         at 
> org.apache.samza.container.SamzaContainer.startConsumers(SamzaContainer.scala:544)
>         at 
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:450)
>         at 
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:78)
>         at 
> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> {noformat}
> This is a pretty bad experience. We should proactively check that the system 
> that has a failed SystemConsumer is not used in either task.inputs or 
> changelogs, and fail outright with better logs.
> On the producer-side, we can't do this because tasks can produce to any 
> system they want (since it's a string). We simply fail when they produce to a 
> non-existent system.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to