InitableTask tas kwas missing. That responds to another problem that I was experiencing (and left for later). Anyway the exception was still there until I commented the changelog definition line in the properties file:
#stores.test12db.changelog=kafka.test12db-changelog As I understand it in case of job going down information will be lost. That is not a real issue as I am storing temporal information there. What I am seeing is something that does not work as I expected. Maybe I am not understanding correctly how the system works. I need that a job has access to the information previously stored in the storage. I was planning a loader job that on receiving some messages with data it stores them in RocksDb and that information should be consumed by a different consumer job to use it for calculation. I see that in the loader job I can put and get information correctly. When I try to access the same storage from a different job I just get null results. How I am supposed to load the DB to be able to use it from the consuming job? Is RocksDB the tool to use or should I use any other technique? Thanks, Jordi -----Mensaje original----- De: Yi Pan [mailto:nickpa...@gmail.com] Enviado el: martes, 11 de agosto de 2015 3:27 Para: dev@samza.apache.org Asunto: Re: Missing a change log offset for SystemStreamPartition Hi, Jordi, Agree with Yan. More specifically, your class definition should be something like: {code} public class testStore implements StreamTask, InitableTask { ... } {code} On Mon, Aug 10, 2015 at 6:08 PM, Yan Fang <yanfang...@gmail.com> wrote: > Hi Jordi, > > I think, you need to implement the *InitableTask* interface. > Otherwise, the content in the init method will not be processed. > > Thanks, > > Fang, Yan > yanfang...@gmail.com > > On Mon, Aug 10, 2015 at 3:24 AM, Jordi Blasi Uribarri > <jbl...@nextel.es> > wrote: > > > Just for making it easier to reproduce the problem I just reduced > > the > code > > of the job to the minimum: > > > > package test; > > > > import org.apache.samza.config.Config; import > > org.apache.samza.storage.kv.KeyValueStore; > > import org.apache.samza.system.IncomingMessageEnvelope; > > import org.apache.samza.task.MessageCollector; > > import org.apache.samza.task.StreamTask; import > > org.apache.samza.task.TaskContext; > > import org.apache.samza.task.TaskCoordinator; > > > > public class testStore implements StreamTask { > > private KeyValueStore<String, String> storestp; > > > > public void init(Config config, TaskContext context) { > > this.storestp = (KeyValueStore<String, String>) > > context.getStore("test11db"); > > } > > > > public void process (IncomingMessageEnvelope envelope, > > MessageCollector collector, > > TaskCoordinator coordinator) > > { > > String msgin = (String) envelope.getMessage(); > > storestp.put("test1",msgin); > > } > > } > > > > The properties file contains this: > > > > task.class=test.testStore > > job.name=test.testStore > > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > > yarn.package.path=http://192.168.15.92/jobs/nxtBroker-0.0.1-bin.tar. > > gz > > > > > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemF > actory > > > systems.kafka.consumer.zookeeper.connect=kfk-kafka01:2181,kfk-kafka02: > 2181 > > > > > systems.kafka.producer.bootstrap.servers=kfk-kafka01:9092,kfk-kafka01: > 9093,kfk-kafka02:9092,kfk-kafka02:9093 > > > > > systems.kafka.producer.metadata.broker.list=kfk-kafka01:9092,kfk-kafka > 01:9093,kfk-kafka02:9092,kfk-kafka02:909 > > > > # Declare that we want our job's checkpoints to be written to Kafka > > > > > task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpo > intManagerFactory > > task.checkpoint.system=kafka > > > > # The job consumes a topic called "configtpc" from the "kafka" > > system task.inputs=kafka.configtpc > > > > # Define a serializer/deserializer called "json" which parses JSON > messages > > > > > serializers.registry.json.class=org.apache.samza.serializers.JsonSerde > Factory > > > > > serializers.registry.string.class=org.apache.samza.serializers.StringS > erdeFactory > > > > # Serializer for the system > > systems.kafka.samza.msg.serde=string > > systems.kafka.streams.tracetpc.samza.msg.serde=json > > > > # Use the key-value store implementation for a store called "my-store" > > > > > stores.test11db.factory=org.apache.samza.storage.kv.RocksDbKeyValueSto > rageEngineFactory > > > > # Use the Kafka topic "routingdb-changelog" as the changelog stream > > for this store. > > # This enables automatic recovery of the store after a failure. If > > you don't # configure this, no changelog stream will be generated. > > stores.test11db.changelog=kafka.test11db-changelog > > > > # Encode keys and values in the store as UTF-8 strings. > > stores.test11db.key.serde=string > > stores.test11db.msg.serde=string > > > > # Commit checkpoints every 1 seconds task.commit.ms=1000 > > > > With this, I am getting just the same error: > > > > java version "1.7.0_79" > > OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1) > > OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode) log4j:WARN No > > appenders could be found for logger > > (org.apache.samza.metrics.JmxServer). > > log4j:WARN Please initialize the log4j system properly. > > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig > > for more info. > > Exception in thread "main" org.apache.samza.SamzaException: Missing > > a change log offset for SystemStreamPartition [kafka, > > test11db-changelog, > 2]. > > at > > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$ > $anonfun$1.apply(TaskStorageManager.scala:87) > > at > > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$ > $anonfun$1.apply(TaskStorageManager.scala:87) > > at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) > > at scala.collection.AbstractMap.getOrElse(Map.scala:58) > > at > > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3. > apply(TaskStorageManager.scala:87) > > at > > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3. > apply(TaskStorageManager.scala:84) > > at > > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(T > raversableLike.scala:772) > > at > > > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike > .scala:245) > > at > > > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike > .scala:245) > > at > > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(T > raversableLike.scala:772) > > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > > at > > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.sc > ala:771) > > at > scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245) > > at > > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.sc > ala:771) > > at > > > org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorage > Manager.scala:84) > > at > > > org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.sc > ala:63) > > at > > > org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala > :88) > > at > > > org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply > (SamzaContainer.scala:607) > > at > > > org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply > (SamzaContainer.scala:607) > > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > at > > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > > at > > > org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.s > cala:607) > > at > > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550) > > at > > > org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca > la:108) > > at > > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) > > at > > org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > > > > > > The job fails even when there is no message sent to the input topic. > > > > Samza is version 0.9.1 and kafka 0.8.2. > > > > Thanks, > > > > Jordi > > > > -----Mensaje original----- > > De: Jordi Blasi Uribarri [mailto:jbl...@nextel.es] Enviado el: > > lunes, 10 de agosto de 2015 10:26 > > Para: dev@samza.apache.org > > Asunto: RE: Missing a change log offset for SystemStreamPartition > > > > Hi, > > > > I have migrated samza to the last versión and recreated the job with > > a > new > > store name so the streams were created clean. I am getting the same > error: > > > > java version "1.7.0_79" > > OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1) > > OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode) log4j:WARN No > > appenders could be found for logger (org.apache.samza.metrics.JmxServer). > > log4j:WARN Please initialize the log4j system properly. > > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig > > for more info. > > Exception in thread "main" org.apache.samza.SamzaException: Missing > > a change log offset for SystemStreamPartition [kafka, commdb-changelog, 2]. > > at > > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$ > $anonfun$1.apply(TaskStorageManager.scala:87) > > at > > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$ > $anonfun$1.apply(TaskStorageManager.scala:87) > > at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) > > at scala.collection.AbstractMap.getOrElse(Map.scala:58) > > at > > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3. > apply(TaskStorageManager.scala:87) > > at > > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3. > apply(TaskStorageManager.scala:84) > > at > > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(T > raversableLike.scala:772) > > at > > > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike > .scala:245) > > at > > > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike > .scala:245) > > at > > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(T > raversableLike.scala:772) > > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > > at > > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.sc > ala:771) > > at > scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245) > > at > > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.sc > ala:771) > > at > > > org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorage > Manager.scala:84) > > at > > > org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.sc > ala:63) > > at > > > org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala > :88) > > at > > > org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply > (SamzaContainer.scala:607) > > at > > > org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply > (SamzaContainer.scala:607) > > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > at > > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > > at > > > org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.s > cala:607) > > at > > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550) > > at > > > org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca > la:108) > > at > > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) > > at > > org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > > > > Is there any other info I can attach to help find the problem? > > > > Thanks, > > > > Jordi > > > > -----Mensaje original----- > > De: Yan Fang [mailto:yanfang...@gmail.com] Enviado el: viernes, 07 > > de agosto de 2015 23:21 > > Para: dev@samza.apache.org > > Asunto: Re: Missing a change log offset for SystemStreamPartition > > > > Hi Jordi, > > > > Sorry for getting you back late. Was quite busy yesterday. > > > > I think the reason of your error is that you mismatched Samza > > version and Kafka version. > > > > Seems that, you are using Samza 0.8.1 with Kafka 0.8.2, which is not > > supported. > > > > So my suggestion is to upgrade to *Samza 0.9.1*, then use *Kafka 0.8.2*. > > This match is proved working. > > > > Hope this helps you. > > > > Thanks, > > > > > > Fang, Yan > > yanfang...@gmail.com > > > > On Thu, Aug 6, 2015 at 1:16 AM, Jordi Blasi Uribarri > > <jbl...@nextel.es> > > wrote: > > > > > I changed the job name and the store name. I was defining two > > > different stores and in case that was the problem, I also > > > eliminated > the > > second one. > > > I am getting the same exception. > > > > > > Exception in thread "main" org.apache.samza.SamzaException: > > > Missing a change log offset for SystemStreamPartition [kafka, > > > testdb-changelog, > 2]. > > > at > > > > > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$ > $anonfun$1.apply(TaskStorageManager.scala:87) > > > at > > > > > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$ > $anonfun$1.apply(TaskStorageManager.scala:87) > > > at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) > > > at scala.collection.AbstractMap.getOrElse(Map.scala:58) > > > at > > > > > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3. > apply(TaskStorageManager.scala:87) > > > at > > > > > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3. > apply(TaskStorageManager.scala:84) > > > at > > > > > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(T > raversableLike.scala:772) > > > at > > > > > > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike > .scala:245) > > > at > > > > > > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike > .scala:245) > > > at > > > > > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(T > raversableLike.scala:772) > > > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > > > at > > > > > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.sc > ala:771) > > > at > > scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245) > > > at > > > > > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.sc > ala:771) > > > at > > > > > > org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorage > Manager.scala:84) > > > at > > > > > > org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.sc > ala:63) > > > at > > > > > > org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala > :88) > > > at > > > > > > org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply > (SamzaContainer.scala:607) > > > at > > > > > > org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply > (SamzaContainer.scala:607) > > > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > > > at > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > > at > > > > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:2 > 06) > > > at > > > > > > org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.s > cala:607) > > > at > > > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550) > > > at > > > > > > org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca > la:108) > > > at > > > > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:8 > 7) > > > at > > > org.apache.samza.container.SamzaContainer.main(SamzaContainer.scal > > > a) > > > > > > As I have the autocreate configured in Kafka I am not creating > > > anything for the store. Is that ok? > > > > > > By the way, is there any problem on having two different stores? > > > > > > Thanks, > > > > > > Jordi > > > > > > -----Mensaje original----- > > > De: Yan Fang [mailto:yanfang...@gmail.com] Enviado el: miércoles, > > > 05 de agosto de 2015 20:23 > > > Para: dev@samza.apache.org > > > Asunto: Re: Missing a change log offset for SystemStreamPartition > > > > > > Hi Jordi, > > > > > > I wonder, the reason of your first exception is that, you changed > > > the task number (partition number of your input stream), but still > > > were using the same changelog stream. It is trying to send to the > > > partition 2, which does not exist? > > > > > > Can you reproduce this exception in a new job? (new store name, > > > new job > > > name) > > > > > > The second exception is caused by the wrong offset format, I believe. > > > > > > Let me know how the new job goes. > > > > > > Thanks, > > > > > > Fang, Yan > > > yanfang...@gmail.com > > > > > > On Wed, Aug 5, 2015 at 12:51 AM, Jordi Blasi Uribarri > > > <jbl...@nextel.es> > > > wrote: > > > > > > > Hi, > > > > > > > > I am trying to use the Keystore to manage some state information. > > > > Basically this is the code I am using. As long as I have tested, > > > > the rest is working correctly. > > > > > > > > private KeyValueStore<String, String> storestp; > > > > > > > > public void init(Config config, TaskContext context) { > > > > this.storestp = (KeyValueStore<String, String>) > > > > context.getStore("stepdb"); > > > > } > > > > > > > > public void process(IncomingMessageEnvelope envelope, > > > > MessageCollector collector, > > > > TaskCoordinator coordinator) > > > > { > > > > … > > > > String str = storestp.get(code) > > > > … > > > > } > > > > > > > > When I load it, it goes to running but, whe I send the messages > > > > through Kafka stream It goes to Failed state. I have found this > > > Exception: > > > > Exception in thread "main" org.apache.samza.SamzaException: > > > > Missing a change log offset for SystemStreamPartition [kafka, > > stepdb-changelog, 2]. > > > > at > > > > > > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumer > > > s$3$ > > > $anonfun$1.apply(TaskStorageManager.scala:87) > > > > at > > > > > > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumer > > > s$3$ > > > $anonfun$1.apply(TaskStorageManager.scala:87) > > > > at > scala.collection.MapLike$class.getOrElse(MapLike.scala:128) > > > > at scala.collection.AbstractMap.getOrElse(Map.scala:58) > > > > at > > > > > > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3. > > > apply(TaskStorageManager.scala:87) > > > > at > > > > > > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3. > > > apply(TaskStorageManager.scala:84) > > > > at > > > > > > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.app > > > ly(T > > > raversableLike.scala:772) > > > > at > > > > > > > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(Map > > > Like > > > .scala:245) > > > > at > > > > > > > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(Map > > > Like > > > .scala:245) > > > > at > > > > > > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.app > > > ly(T > > > raversableLike.scala:772) > > > > at scala.collection.immutable.Map$Map2.foreach(Map.scala:130) > > > > at > > > > > > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLik > > > e.sc > > > ala:771) > > > > at > > > scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245) > > > > at > > > > > > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLik > > > e.sc > > > ala:771) > > > > at > > > > > > > org.apache.samza.storage.TaskStorageManager.startConsumers(TaskSto > > > rage > > > Manager.scala:84) > > > > at > > > > > > > org.apache.samza.storage.TaskStorageManager.init(TaskStorageManage > > > r.sc > > > ala:63) > > > > at > > > > > > > org.apache.samza.container.TaskInstance.startStores(TaskInstance.s > > > cala > > > :88) > > > > at > > > > > > > org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.a > > > pply > > > (SamzaContainer.scala:607) > > > > at > > > > > > > org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.a > > > pply > > > (SamzaContainer.scala:607) > > > > at > scala.collection.Iterator$class.foreach(Iterator.scala:727) > > > > at > > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > > > at > > > > > > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala > > :206) > > > > at > > > > > > > org.apache.samza.container.SamzaContainer.startStores(SamzaContain > > > er.s > > > cala:607) > > > > at > > > > > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550 > ) > > > > at > > > > > > > org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer > > > .sca > > > la:108) > > > > at > > > > > > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala > > :87) > > > > at > > > > org.apache.samza.container.SamzaContainer.main(SamzaContainer.sc > > > > ala) > > > > > > > > I have seen that the stepdb-changelog stream exists in Kafka. As > > > > a try to regenerate the missing offset and tes it I have > > > > connected through the command line and send a message to the > > > > stream. It was received > > > correctly. > > > > Now I am seeing the following Exception: > > > > > > > > Exception in thread "main" java.lang.NullPointerException > > > > at > > > > > > > scala.collection.mutable.ArrayOps$ofByte$.length$extension(ArrayOp > > > s.sc > > > ala:126) > > > > at > > > > scala.collection.mutable.ArrayOps$ofByte.length(ArrayOps.scala:126) > > > > at scala.collection.SeqLike$class.size(SeqLike.scala:106) > > > > at > > > > scala.collection.mutable.ArrayOps$ofByte.size(ArrayOps.scala:120) > > > > at > > > > > > > org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore > > > $1.a > > > pply(KeyValueStorageEngine.scala:94) > > > > at > > > > > > > org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore > > > $1.a > > > pply(KeyValueStorageEngine.scala:79) > > > > at > scala.collection.Iterator$class.foreach(Iterator.scala:727) > > > > at > > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > > > at > > > > > > > org.apache.samza.storage.kv.KeyValueStorageEngine.restore(KeyValue > > > Stor > > > ageEngine.scala:79) > > > > at > > > > > > > org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores > > > $3.a > > > pply(TaskStorageManager.scala:112) > > > > at > > > > > > > org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores > > > $3.a > > > pply(TaskStorageManager.scala:106) > > > > at > > > > > > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.app > > > ly(T > > > raversableLike.scala:772) > > > > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > > > > at > > > > > > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLik > > > e.sc > > > ala:771) > > > > at > > > > > > > org.apache.samza.storage.TaskStorageManager.restoreStores(TaskStor > > > ageM > > > anager.scala:106) > > > > at > > > > > > > org.apache.samza.storage.TaskStorageManager.init(TaskStorageManage > > > r.sc > > > ala:64) > > > > at > > > > > > > org.apache.samza.container.TaskInstance.startStores(TaskInstance.s > > > cala > > > :88) > > > > at > > > > > > > org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.a > > > pply > > > (SamzaContainer.scala:607) > > > > at > > > > > > > org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.a > > > pply > > > (SamzaContainer.scala:607) > > > > at > scala.collection.Iterator$class.foreach(Iterator.scala:727) > > > > at > > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > > > at > > > > > > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala > > :206) > > > > at > > > > > > > org.apache.samza.container.SamzaContainer.startStores(SamzaContain > > > er.s > > > cala:607) > > > > at > > > > > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550 > ) > > > > at > > > > > > > org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer > > > .sca > > > la:108) > > > > at > > > > > > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala > > :87) > > > > at > > > > org.apache.samza.container.SamzaContainer.main(SamzaContainer.sc > > > > ala) > > > > > > > > Is there something wrong? > > > > > > > > Thanks, > > > > > > > > Jordi > > > > ________________________________ Jordi Blasi Uribarri Área I+D+i > > > > > > > > jbl...@nextel.es > > > > Oficina Bilbao > > > > > > > > [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png] > > > > > > > > > >