[
https://issues.apache.org/jira/browse/SAMZA-169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Martin Kleppmann updated SAMZA-169:
-----------------------------------
Attachment: SAMZA-169.patch
Thanks [~criccomini] for diagnosing the bug: KafkaSystemAdmin would not retry
if the TopicMetadata request returned an error code (it would only retry if the
PartitionMetadata request returned an error code).
I've added the missing error check, and added a test for this particular
condition. Patch attached. I had to slightly change the KafkaSystemAdmin API in
order to mock out the call to Kafka. Not sure if this is the most elegant way
of testing it; please let me know if you can think of a better way.
This patch needs to be applied on top of the SAMZA-168 patch. RB won't let me
create a review that's not based off master (at least I don't know how), so
I'll create the RB when SAMZA-168 is committed.
> Task initialization fails if changelog stream does not already exist
> --------------------------------------------------------------------
>
> Key: SAMZA-169
> URL: https://issues.apache.org/jira/browse/SAMZA-169
> Project: Samza
> Issue Type: Bug
> Components: kafka, kv
> Affects Versions: 0.6.0
> Reporter: Martin Kleppmann
> Assignee: Martin Kleppmann
> Fix For: 0.7.0
>
> Attachments: SAMZA-169.patch
>
>
> The first time you run a job that uses state with changelog (such as in
> SAMZA-152), you get the following exception:
> {noformat}
> 2014-03-04 18:15:20 SamzaContainer [ERROR] Caught exception in process loop.
> org.apache.samza.SamzaException: Missing a change log offset for
> SystemStreamPartition [partition=Partition [partition=1], system=kafka,
> stream=wikipedia-stats-changelog].
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:84)
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:84)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:81)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> at
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at
> org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:81)
> at
> org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:60)
> at
> org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:103)
> at
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:579)
> at
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:579)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> at
> org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:579)
> at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:504)
> at
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:81)
> at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> {noformat}
> If debug logging is turned on, the following telltale line appears in the log
> earlier:
> {noformat}
> 2014-03-04 18:15:20 KafkaSystemAdmin [DEBUG] Got metadata for streams:
> Map(wikipedia-stats-changelog -> {TopicMetadata for topic
> wikipedia-stats-changelog ->
> No partition metadata for topic wikipedia-stats-changelog due to
> kafka.common.LeaderNotAvailableException})
> {noformat}
> Full log: https://gist.github.com/ept/1fecad1b2d79797990a8
--
This message was sent by Atlassian JIRA
(v6.2#6252)