InitableTask tas kwas missing. That responds to another problem that I was 
experiencing (and left for later). Anyway the exception was still there until I 
commented the changelog definition line in the properties file:

#stores.test12db.changelog=kafka.test12db-changelog

As I understand it in case of job going down information will be lost. That is 
not a real issue as I am storing temporal information there. 

What I am seeing is something that does not work as I expected. Maybe I am not 
understanding correctly how the system works. I need that a job has access to 
the information previously stored in the storage. I was planning a loader job 
that on receiving some messages with data it stores them in RocksDb and that 
information should be consumed by a different consumer job to use it for 
calculation.

I see that in the loader job I can put and get information correctly. When I 
try to access the same storage from a different job I just get null results.

How I am supposed to load the DB to be able to use it from the consuming job? 
Is RocksDB the tool to use or should I use any other technique?

Thanks,

        Jordi



-----Mensaje original-----
De: Yi Pan [mailto:nickpa...@gmail.com] 
Enviado el: martes, 11 de agosto de 2015 3:27
Para: dev@samza.apache.org
Asunto: Re: Missing a change log offset for SystemStreamPartition

Hi, Jordi,

Agree with Yan. More specifically, your class definition should be something 
like:
{code}
public class testStore implements StreamTask, InitableTask { ...
}
{code}

On Mon, Aug 10, 2015 at 6:08 PM, Yan Fang <yanfang...@gmail.com> wrote:

> Hi Jordi,
>
> I think, you need to implement the *InitableTask* interface. 
> Otherwise, the content in the init method will not be processed.
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Mon, Aug 10, 2015 at 3:24 AM, Jordi Blasi Uribarri 
> <jbl...@nextel.es>
> wrote:
>
> > Just for making it easier to reproduce the problem I just reduced 
> > the
> code
> > of the job to the minimum:
> >
> > package test;
> >
> > import org.apache.samza.config.Config; import 
> > org.apache.samza.storage.kv.KeyValueStore;
> > import org.apache.samza.system.IncomingMessageEnvelope;
> > import org.apache.samza.task.MessageCollector;
> > import org.apache.samza.task.StreamTask; import 
> > org.apache.samza.task.TaskContext;
> > import org.apache.samza.task.TaskCoordinator;
> >
> > public class testStore implements StreamTask {
> >         private KeyValueStore<String, String> storestp;
> >
> >          public void init(Config config, TaskContext context) {
> >                     this.storestp = (KeyValueStore<String, String>) 
> > context.getStore("test11db");
> >                   }
> >
> >         public void process (IncomingMessageEnvelope envelope,
> >                        MessageCollector collector,
> >                        TaskCoordinator coordinator)
> >         {
> >                 String msgin = (String) envelope.getMessage();
> >                 storestp.put("test1",msgin);
> >         }
> > }
> >
> > The properties file contains this:
> >
> > task.class=test.testStore
> > job.name=test.testStore
> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> > yarn.package.path=http://192.168.15.92/jobs/nxtBroker-0.0.1-bin.tar.
> > gz
> >
> >
> >
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemF
> actory
> >
> systems.kafka.consumer.zookeeper.connect=kfk-kafka01:2181,kfk-kafka02:
> 2181
> >
> >
> systems.kafka.producer.bootstrap.servers=kfk-kafka01:9092,kfk-kafka01:
> 9093,kfk-kafka02:9092,kfk-kafka02:9093
> >
> >
> systems.kafka.producer.metadata.broker.list=kfk-kafka01:9092,kfk-kafka
> 01:9093,kfk-kafka02:9092,kfk-kafka02:909
> >
> > # Declare that we want our job's checkpoints to be written to Kafka
> >
> >
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpo
> intManagerFactory
> > task.checkpoint.system=kafka
> >
> > # The job consumes a topic called "configtpc" from the "kafka" 
> > system task.inputs=kafka.configtpc
> >
> > # Define a serializer/deserializer called "json" which parses JSON
> messages
> >
> >
> serializers.registry.json.class=org.apache.samza.serializers.JsonSerde
> Factory
> >
> >
> serializers.registry.string.class=org.apache.samza.serializers.StringS
> erdeFactory
> >
> > # Serializer for the system
> > systems.kafka.samza.msg.serde=string
> > systems.kafka.streams.tracetpc.samza.msg.serde=json
> >
> > # Use the key-value store implementation for a store called "my-store"
> >
> >
> stores.test11db.factory=org.apache.samza.storage.kv.RocksDbKeyValueSto
> rageEngineFactory
> >
> > # Use the Kafka topic "routingdb-changelog" as the changelog stream 
> > for this store.
> > # This enables automatic recovery of the store after a failure. If 
> > you don't # configure this, no changelog stream will be generated.
> > stores.test11db.changelog=kafka.test11db-changelog
> >
> > # Encode keys and values in the store as UTF-8 strings.
> > stores.test11db.key.serde=string
> > stores.test11db.msg.serde=string
> >
> > # Commit checkpoints every 1 seconds task.commit.ms=1000
> >
> > With this, I am getting just the same error:
> >
> > java version "1.7.0_79"
> > OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1) 
> > OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode) log4j:WARN No 
> > appenders could be found for logger 
> > (org.apache.samza.metrics.JmxServer).
> > log4j:WARN Please initialize the log4j system properly.
> > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig 
> > for more info.
> > Exception in thread "main" org.apache.samza.SamzaException: Missing 
> > a change log offset for SystemStreamPartition [kafka, 
> > test11db-changelog,
> 2].
> >         at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$
> $anonfun$1.apply(TaskStorageManager.scala:87)
> >         at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$
> $anonfun$1.apply(TaskStorageManager.scala:87)
> >         at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> >         at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> >         at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.
> apply(TaskStorageManager.scala:87)
> >         at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.
> apply(TaskStorageManager.scala:84)
> >         at
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(T
> raversableLike.scala:772)
> >         at
> >
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike
> .scala:245)
> >         at
> >
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike
> .scala:245)
> >         at
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(T
> raversableLike.scala:772)
> >         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> >         at
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.sc
> ala:771)
> >         at
> scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
> >         at
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.sc
> ala:771)
> >         at
> >
> org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorage
> Manager.scala:84)
> >         at
> >
> org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.sc
> ala:63)
> >         at
> >
> org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala
> :88)
> >         at
> >
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply
> (SamzaContainer.scala:607)
> >         at
> >
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply
> (SamzaContainer.scala:607)
> >         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >         at
> > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> >         at
> >
> org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.s
> cala:607)
> >         at
> > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
> >         at
> >
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca
> la:108)
> >         at
> > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
> >         at
> > org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> >
> >
> > The job fails even when there is no message sent to the input topic.
> >
> > Samza is version 0.9.1 and kafka 0.8.2.
> >
> > Thanks,
> >
> >   Jordi
> >
> > -----Mensaje original-----
> > De: Jordi Blasi Uribarri [mailto:jbl...@nextel.es] Enviado el: 
> > lunes, 10 de agosto de 2015 10:26
> > Para: dev@samza.apache.org
> > Asunto: RE: Missing a change log offset for SystemStreamPartition
> >
> > Hi,
> >
> > I have migrated samza to the last versión and recreated the job with 
> > a
> new
> > store name so the streams were created clean. I am getting the same
> error:
> >
> > java version "1.7.0_79"
> > OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1) 
> > OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode) log4j:WARN No 
> > appenders could be found for logger (org.apache.samza.metrics.JmxServer).
> > log4j:WARN Please initialize the log4j system properly.
> > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig 
> > for more info.
> > Exception in thread "main" org.apache.samza.SamzaException: Missing 
> > a change log offset for SystemStreamPartition [kafka, commdb-changelog, 2].
> >         at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$
> $anonfun$1.apply(TaskStorageManager.scala:87)
> >         at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$
> $anonfun$1.apply(TaskStorageManager.scala:87)
> >         at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> >         at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> >         at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.
> apply(TaskStorageManager.scala:87)
> >         at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.
> apply(TaskStorageManager.scala:84)
> >         at
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(T
> raversableLike.scala:772)
> >         at
> >
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike
> .scala:245)
> >         at
> >
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike
> .scala:245)
> >         at
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(T
> raversableLike.scala:772)
> >         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> >         at
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.sc
> ala:771)
> >         at
> scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
> >         at
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.sc
> ala:771)
> >         at
> >
> org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorage
> Manager.scala:84)
> >         at
> >
> org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.sc
> ala:63)
> >         at
> >
> org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala
> :88)
> >         at
> >
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply
> (SamzaContainer.scala:607)
> >         at
> >
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply
> (SamzaContainer.scala:607)
> >         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >         at
> > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> >         at
> >
> org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.s
> cala:607)
> >         at
> > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
> >         at
> >
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca
> la:108)
> >         at
> > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
> >         at
> > org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> >
> > Is there any other info I can attach to help find the problem?
> >
> > Thanks,
> >
> >   Jordi
> >
> > -----Mensaje original-----
> > De: Yan Fang [mailto:yanfang...@gmail.com] Enviado el: viernes, 07 
> > de agosto de 2015 23:21
> > Para: dev@samza.apache.org
> > Asunto: Re: Missing a change log offset for SystemStreamPartition
> >
> > Hi Jordi,
> >
> > Sorry for getting you back late. Was quite busy yesterday.
> >
> > I think the reason of your error is that you mismatched Samza 
> > version and Kafka version.
> >
> > Seems that, you are using Samza 0.8.1 with Kafka 0.8.2, which is not 
> > supported.
> >
> > So my suggestion is to upgrade to *Samza 0.9.1*, then use *Kafka 0.8.2*.
> > This match is proved working.
> >
> > Hope this helps you.
> >
> > Thanks,
> >
> >
> > Fang, Yan
> > yanfang...@gmail.com
> >
> > On Thu, Aug 6, 2015 at 1:16 AM, Jordi Blasi Uribarri 
> > <jbl...@nextel.es>
> > wrote:
> >
> > > I changed the job name and the store name. I was defining two 
> > > different stores and in case that was the problem, I also 
> > > eliminated
> the
> > second one.
> > > I am getting the same exception.
> > >
> > > Exception in thread "main" org.apache.samza.SamzaException: 
> > > Missing a change log offset for SystemStreamPartition [kafka, 
> > > testdb-changelog,
> 2].
> > >         at
> > >
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$
> $anonfun$1.apply(TaskStorageManager.scala:87)
> > >         at
> > >
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$
> $anonfun$1.apply(TaskStorageManager.scala:87)
> > >         at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> > >         at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> > >         at
> > >
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.
> apply(TaskStorageManager.scala:87)
> > >         at
> > >
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.
> apply(TaskStorageManager.scala:84)
> > >         at
> > >
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(T
> raversableLike.scala:772)
> > >         at
> > >
> >
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike
> .scala:245)
> > >         at
> > >
> >
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike
> .scala:245)
> > >         at
> > >
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(T
> raversableLike.scala:772)
> > >         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> > >         at
> > >
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.sc
> ala:771)
> > >         at
> > scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
> > >         at
> > >
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.sc
> ala:771)
> > >         at
> > >
> >
> org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorage
> Manager.scala:84)
> > >         at
> > >
> >
> org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.sc
> ala:63)
> > >         at
> > >
> >
> org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala
> :88)
> > >         at
> > >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply
> (SamzaContainer.scala:607)
> > >         at
> > >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply
> (SamzaContainer.scala:607)
> > >         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > >         at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > >         at
> > >
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:2
> 06)
> > >         at
> > >
> >
> org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.s
> cala:607)
> > >         at
> > > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
> > >         at
> > >
> >
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca
> la:108)
> > >         at
> > >
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:8
> 7)
> > >         at
> > > org.apache.samza.container.SamzaContainer.main(SamzaContainer.scal
> > > a)
> > >
> > > As I have the autocreate configured in Kafka I am not creating 
> > > anything for the store. Is that ok?
> > >
> > > By the way, is there any problem on having two different stores?
> > >
> > > Thanks,
> > >
> > >     Jordi
> > >
> > > -----Mensaje original-----
> > > De: Yan Fang [mailto:yanfang...@gmail.com] Enviado el: miércoles, 
> > > 05 de agosto de 2015 20:23
> > > Para: dev@samza.apache.org
> > > Asunto: Re: Missing a change log offset for SystemStreamPartition
> > >
> > > Hi Jordi,
> > >
> > > I wonder, the reason of your first exception is that, you changed 
> > > the task number (partition number of your input stream), but still 
> > > were using the same changelog stream. It is trying to send to the 
> > > partition 2, which does not exist?
> > >
> > > Can you reproduce this exception in a new job? (new store name, 
> > > new job
> > > name)
> > >
> > > The second exception is caused by the wrong offset format, I believe.
> > >
> > > Let me know how the new job goes.
> > >
> > > Thanks,
> > >
> > > Fang, Yan
> > > yanfang...@gmail.com
> > >
> > > On Wed, Aug 5, 2015 at 12:51 AM, Jordi Blasi Uribarri 
> > > <jbl...@nextel.es>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I am trying to use the Keystore to manage some state information.
> > > > Basically this is the code I am using. As long as I have tested, 
> > > > the rest is working correctly.
> > > >
> > > > private KeyValueStore<String, String> storestp;
> > > >
> > > > public void init(Config config, TaskContext context) {
> > > >                  this.storestp = (KeyValueStore<String, String>) 
> > > > context.getStore("stepdb");
> > > >                }
> > > >
> > > >        public void process(IncomingMessageEnvelope envelope,
> > > >                     MessageCollector collector,
> > > >                     TaskCoordinator coordinator)
> > > >                     {
> > > >                            …
> > > > String str = storestp.get(code)
> > > > …
> > > > }
> > > >
> > > > When I load it, it goes to running but, whe I send the messages 
> > > > through Kafka stream It goes to Failed state. I have found this
> > > Exception:
> > > > Exception in thread "main" org.apache.samza.SamzaException: 
> > > > Missing a change log offset for SystemStreamPartition [kafka,
> > stepdb-changelog, 2].
> > > >         at
> > > >
> > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumer
> > > s$3$
> > > $anonfun$1.apply(TaskStorageManager.scala:87)
> > > >         at
> > > >
> > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumer
> > > s$3$
> > > $anonfun$1.apply(TaskStorageManager.scala:87)
> > > >         at
> scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> > > >         at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> > > >         at
> > > >
> > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.
> > > apply(TaskStorageManager.scala:87)
> > > >         at
> > > >
> > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.
> > > apply(TaskStorageManager.scala:84)
> > > >         at
> > > >
> > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.app
> > > ly(T
> > > raversableLike.scala:772)
> > > >         at
> > > >
> > > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(Map
> > > Like
> > > .scala:245)
> > > >         at
> > > >
> > > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(Map
> > > Like
> > > .scala:245)
> > > >         at
> > > >
> > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.app
> > > ly(T
> > > raversableLike.scala:772)
> > > >         at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
> > > >         at
> > > >
> > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLik
> > > e.sc
> > > ala:771)
> > > >         at
> > > scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
> > > >         at
> > > >
> > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLik
> > > e.sc
> > > ala:771)
> > > >         at
> > > >
> > > org.apache.samza.storage.TaskStorageManager.startConsumers(TaskSto
> > > rage
> > > Manager.scala:84)
> > > >         at
> > > >
> > > org.apache.samza.storage.TaskStorageManager.init(TaskStorageManage
> > > r.sc
> > > ala:63)
> > > >         at
> > > >
> > > org.apache.samza.container.TaskInstance.startStores(TaskInstance.s
> > > cala
> > > :88)
> > > >         at
> > > >
> > > org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.a
> > > pply
> > > (SamzaContainer.scala:607)
> > > >         at
> > > >
> > > org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.a
> > > pply
> > > (SamzaContainer.scala:607)
> > > >         at
> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > > >         at
> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > > >         at
> > > >
> > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala
> > :206)
> > > >         at
> > > >
> > > org.apache.samza.container.SamzaContainer.startStores(SamzaContain
> > > er.s
> > > cala:607)
> > > >         at
> > > >
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550
> )
> > > >         at
> > > >
> > > org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer
> > > .sca
> > > la:108)
> > > >         at
> > > >
> > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala
> > :87)
> > > >         at
> > > > org.apache.samza.container.SamzaContainer.main(SamzaContainer.sc
> > > > ala)
> > > >
> > > > I have seen that the stepdb-changelog stream exists in Kafka. As 
> > > > a try to regenerate the missing offset and tes it I have 
> > > > connected through the command line and send a message to the 
> > > > stream. It was received
> > > correctly.
> > > > Now I am seeing the following Exception:
> > > >
> > > > Exception in thread "main" java.lang.NullPointerException
> > > >         at
> > > >
> > > scala.collection.mutable.ArrayOps$ofByte$.length$extension(ArrayOp
> > > s.sc
> > > ala:126)
> > > >         at
> > > > scala.collection.mutable.ArrayOps$ofByte.length(ArrayOps.scala:126)
> > > >         at scala.collection.SeqLike$class.size(SeqLike.scala:106)
> > > >         at
> > > > scala.collection.mutable.ArrayOps$ofByte.size(ArrayOps.scala:120)
> > > >         at
> > > >
> > > org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore
> > > $1.a
> > > pply(KeyValueStorageEngine.scala:94)
> > > >         at
> > > >
> > > org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore
> > > $1.a
> > > pply(KeyValueStorageEngine.scala:79)
> > > >         at
> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > > >         at
> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > > >         at
> > > >
> > > org.apache.samza.storage.kv.KeyValueStorageEngine.restore(KeyValue
> > > Stor
> > > ageEngine.scala:79)
> > > >         at
> > > >
> > > org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores
> > > $3.a
> > > pply(TaskStorageManager.scala:112)
> > > >         at
> > > >
> > > org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores
> > > $3.a
> > > pply(TaskStorageManager.scala:106)
> > > >         at
> > > >
> > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.app
> > > ly(T
> > > raversableLike.scala:772)
> > > >         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> > > >         at
> > > >
> > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLik
> > > e.sc
> > > ala:771)
> > > >         at
> > > >
> > > org.apache.samza.storage.TaskStorageManager.restoreStores(TaskStor
> > > ageM
> > > anager.scala:106)
> > > >         at
> > > >
> > > org.apache.samza.storage.TaskStorageManager.init(TaskStorageManage
> > > r.sc
> > > ala:64)
> > > >         at
> > > >
> > > org.apache.samza.container.TaskInstance.startStores(TaskInstance.s
> > > cala
> > > :88)
> > > >         at
> > > >
> > > org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.a
> > > pply
> > > (SamzaContainer.scala:607)
> > > >         at
> > > >
> > > org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.a
> > > pply
> > > (SamzaContainer.scala:607)
> > > >         at
> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > > >         at
> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > > >         at
> > > >
> > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala
> > :206)
> > > >         at
> > > >
> > > org.apache.samza.container.SamzaContainer.startStores(SamzaContain
> > > er.s
> > > cala:607)
> > > >         at
> > > >
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550
> )
> > > >         at
> > > >
> > > org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer
> > > .sca
> > > la:108)
> > > >         at
> > > >
> > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala
> > :87)
> > > >         at
> > > > org.apache.samza.container.SamzaContainer.main(SamzaContainer.sc
> > > > ala)
> > > >
> > > > Is there something wrong?
> > > >
> > > > Thanks,
> > > >
> > > >     Jordi
> > > > ________________________________ Jordi Blasi Uribarri Área I+D+i
> > > >
> > > > jbl...@nextel.es
> > > > Oficina Bilbao
> > > >
> > > > [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
> > > >
> > >
> >
>

Reply via email to