[
https://issues.apache.org/jira/browse/SAMZA-151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jakob Homan reassigned SAMZA-151:
---------------------------------
Assignee: Yan Fang
> Fail early when a consumer is misconfigured
> -------------------------------------------
>
> Key: SAMZA-151
> URL: https://issues.apache.org/jira/browse/SAMZA-151
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Yan Fang
> Labels: newbie
> Attachments: SAMZA-151.patch
>
>
> Currently, if a SystemFactory fails to create a SystemConsumer and throws an
> exception, we log this error with:
> {code}
> info("Failed to create a consumer for %s, so skipping." format
> systemName)
> {code}
> SamzaContainer then continues on. If the system with the failed consumer is
> configured to be used for an input stream or changelog, the lack of
> SystemConsumer for the system will ABSOLUTELY result an exception later.
> For example, if you have a system called "kafka", and task.inputs is set to
> "kafka.my-topic", but the KafkaSystemFactory fails to return a consumer, we
> will log the error and continue. The resulting exception that's thrown much
> later is:
> {noformat}
> java.util.NoSuchElementException: key not found: kafka
> at scala.collection.MapLike$class.default(MapLike.scala:223)
> at scala.collection.immutable.Map$EmptyMap$.default(Map.scala:73)
> at scala.collection.MapLike$class.apply(MapLike.scala:134)
> at scala.collection.immutable.Map$EmptyMap$.apply(Map.scala:73)
> at
> org.apache.samza.system.SystemConsumers.register(SystemConsumers.scala:175)
> at
> org.apache.samza.container.TaskInstance$$anonfun$registerConsumers$6.apply(TaskInstance.scala:156)
> at
> org.apache.samza.container.TaskInstance$$anonfun$registerConsumers$6.apply(TaskInstance.scala:155)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
> at
> org.apache.samza.container.TaskInstance.registerConsumers(TaskInstance.scala:155)
> at
> org.apache.samza.container.SamzaContainer$$anonfun$startConsumers$2.apply(SamzaContainer.scala:544)
> at
> org.apache.samza.container.SamzaContainer$$anonfun$startConsumers$2.apply(SamzaContainer.scala:544)
> at
> scala.collection.MapLike$DefaultValuesIterable$$anonfun$foreach$4.apply(MapLike.scala:201)
> at
> scala.collection.MapLike$DefaultValuesIterable$$anonfun$foreach$4.apply(MapLike.scala:201)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
> at
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:201)
> at
> org.apache.samza.container.SamzaContainer.startConsumers(SamzaContainer.scala:544)
> at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:450)
> at
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:78)
> at
> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> {noformat}
> This is a pretty bad experience. We should proactively check that the system
> that has a failed SystemConsumer is not used in either task.inputs or
> changelogs, and fail outright with better logs.
> On the producer-side, we can't do this because tasks can produce to any
> system they want (since it's a string). We simply fail when they produce to a
> non-existent system.
--
This message was sent by Atlassian JIRA
(v6.2#6252)