Chinmay, My bad. You were right on. I had changed one of them but not both.
It is now consuming all data. Thanks a lot. - Shekar On Sat, Jan 17, 2015 at 2:41 PM, Chinmay Soman <[email protected]> wrote: > Stupid question (and I probably already know the answer) but I wanted to be > extra sure. Im assuming you've replaced the "system-name" and "stream-name" > with the actual terms (Kafka and whatever the topic name is) ? > On Jan 17, 2015 2:31 PM, "Shekar Tippur" <[email protected]> wrote: > > > Chinmay, > > > > I see this on the logs: > > > > 2015-01-17 14:24:40 SamzaContainer$ [INFO] Using configuration: > > {system.redis=10.132.67.120, > > > > > metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory, > > *systems.system-name.streams.stream-name.samza.offset.default=oldest*, > > task.drop.deserialization.errors=true, > > > > > serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory, > > task.drop.serialization.errors=true, > > systems.kafka.streams.metrics.samza.msg.serde=metrics, > > task.class=samza.examples.wikipedia.task.ArgosParserStreamTask, > > > > > yarn.package.path=file:///home/ctippur/hello-samza/samza-job-package/target/samza-job-package-0.7.0-dist.tar.gz, > > metrics.reporter.snapshot.stream=kafka.metrics, > > > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory, > > *systems.system-name.streams.stream-name.samza.reset.offset=true*, > > task.inputs=kafka.argos-raw, > > > > > metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory, > > > > > serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory, > > job.factory.class=org.apache.samza.job.local.ThreadJobFactory, > > systems.kafka.producer.batch.num.messages=1, > > task.checkpoint.replication.factor=1, > > systems.kafka.producer.producer.type=sync, job.name=argos-parser, > > systems.kafka.samza.msg.serde=json, > > systems.kafka.consumer.auto.offset.reset=largest, > > systems.kafka.consumer.zookeeper.connect=localhost:2181/, > > > > > task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory, > > task.ignored.exceptions=org.codehaus.jackson.JsonParseException, > > metrics.reporters=snapshot,jmx, task.checkpoint.system=kafka, > > systems.kafka.producer.metadata.broker.list=localhost:9092} > > > > Indicating that it is reading the property. I dont see the job reading > from > > the beginning. > > > > On Fri, Jan 16, 2015 at 9:42 PM, Chinmay Soman < > [email protected]> > > wrote: > > > > > It looks like it is still referring to the offset received from the > > > Checkpoint manager. Can you double check your config for typos ? > > > > > > On Fri, Jan 16, 2015 at 9:35 PM, Shekar Tippur <[email protected]> > > wrote: > > > > > > > Chris - > > > > > > > > Here are the logs: > > > > > > > > 2015-01-16 17:30:15 KafkaCheckpointManager [INFO] Got checkpoint > state > > > for > > > > taskName Partition 0: Checkpoint [offsets={SystemStreamPartition > > [kafka, > > > > argos-raw, 0]=60958}] > > > > > > > > 2015-01-16 17:30:15 OffsetManager [INFO] Successfully loaded last > > > processed > > > > offsets: Map(SystemStreamPartition [kafka, argos-raw, 0] -> 60958) > > > > > > > > 2015-01-16 17:30:15 OffsetManager [INFO] Successfully loaded starting > > > > offsets: Map(SystemStreamPartition [kafka, argos-raw, 0] -> 60959) > > > > > > > > 2015-01-16 17:30:15 SamzaContainer [INFO] Starting task instance > > stores. > > > > > > > > 2015-01-16 17:30:15 SamzaContainer [INFO] Initializing stream tasks. > > > > > > > > 2015-01-16 17:30:15 SamzaContainer [INFO] Registering task instances > > with > > > > producers. > > > > > > > > 2015-01-16 17:30:15 SamzaContainer [INFO] Starting producer > > multiplexer. > > > > > > > > 2015-01-16 17:30:15 SamzaContainer [INFO] Registering task instances > > with > > > > consumers. > > > > > > > > 2015-01-16 17:30:15 SamzaContainer [INFO] Starting consumer > > multiplexer. > > > > > > > > 2015-01-16 17:30:15 VerifiableProperties [INFO] Verifying properties > > > > > > > > 2015-01-16 17:30:15 VerifiableProperties [INFO] Property client.id > is > > > > overridden to samza_consumer-argos_parser-1-1421458212216-3 > > > > > > > > 2015-01-16 17:30:15 VerifiableProperties [INFO] Property > > > > metadata.broker.list is overridden to localhost:9092 > > > > > > > > 2015-01-16 17:30:15 VerifiableProperties [INFO] Property > > > > request.timeout.ms > > > > is overridden to 30000 > > > > > > > > 2015-01-16 17:30:15 ClientUtils$ [INFO] Fetching metadata from broker > > > > id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s) > > > > Set(argos-raw) > > > > > > > > 2015-01-16 17:30:15 SyncProducer [INFO] Connected to localhost:9092 > for > > > > producing > > > > > > > > 2015-01-16 17:30:15 SyncProducer [INFO] Disconnecting from > > localhost:9092 > > > > > > > > 2015-01-16 17:30:15 BrokerProxy [INFO] Creating new SimpleConsumer > for > > > host > > > > pppdc9prd2yv.corp.net:9092 for system kafka > > > > > > > > 2015-01-16 17:30:15 GetOffset [INFO] Validating offset 60959 for > topic > > > and > > > > partition [argos-raw,0] > > > > > > > > 2015-01-16 17:30:15 GetOffset [INFO] Able to successfully read from > > > offset > > > > 60959 for topic and partition [argos-raw,0]. Using it to instantiate > > > > consumer. > > > > > > > > 2015-01-16 17:30:15 BrokerProxy [INFO] Starting BrokerProxy for > > > > pppdc9prd2yv.corp.net:9092 > > > > > > > > 2015-01-16 17:30:15 SamzaContainer [INFO] Entering run loop. > > > > > > > > 2015-01-16 17:30:15 KafkaSystemProducer [INFO] Creating a new > producer > > > for > > > > system kafka. > > > > > > > > 2015-01-16 17:30:15 ClientUtils$ [INFO] Fetching metadata from broker > > > > id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s) > > > > Set(__samza_checkpoint_ver_1_for_argos-parser_1) > > > > > > > > 2015-01-16 17:30:15 SyncProducer [INFO] Connected to localhost:9092 > for > > > > producing > > > > > > > > 2015-01-16 17:30:15 SyncProducer [INFO] Disconnecting from > > localhost:9092 > > > > > > > > 2015-01-16 17:30:15 SyncProducer [INFO] Connected to > > > > pppdc9prd2yv.corp.net:9092 for producing > > > > > > > > On Fri, Jan 16, 2015 at 6:03 PM, Chris Riccomini < > > > > [email protected]> wrote: > > > > > > > > > Hey Shekar, > > > > > > > > > > Hmm. Could you post your logs, so we can have a look? The logs will > > > post > > > > > both the config, and the offsets used. If there's an issue, we > should > > > be > > > > > able to figure it out. > > > > > > > > > > Cheers, > > > > > Chris > > > > > > > > > > On 1/16/15 5:41 PM, "Shekar Tippur" <[email protected]> wrote: > > > > > > > > > > >Interesting .. > > > > > > > > > > > > > > > > > >I have added > > > > > > > > > > > >systems.system-name.streams.stream-name.samza.reset.offset = true > > > > > > > > > > > >systems.system-name.streams.stream-name.samza.offset.default = > > oldest > > > > > > > > > > > >to the config. I see that it is still looking only the latest > > events. > > > > > > > > > > > > > > > > > >- Shekar > > > > > > > > > > > >On Fri, Jan 16, 2015 at 3:26 PM, Chinmay Soman < > > > > [email protected] > > > > > > > > > > > >wrote: > > > > > > > > > > > >> Aah that's right. Thanks for confirming. > > > > > >> > > > > > >> On Fri, Jan 16, 2015 at 3:22 PM, Chris Riccomini < > > > > > >> [email protected]> wrote: > > > > > >> > > > > > >> > Hey Chinmay, > > > > > >> > > > > > > >> > We do have checkpoint-tool.sh, which allows you to set input > > > offsets > > > > > >>to > > > > > >> > arbitrary locations. One caveat here is that your job has to > be > > > > > >>offline > > > > > >> > when this is done. When coordinator stream is done, it should > be > > > > > >>possible > > > > > >> > to dynamically change the offsets (by bouncing the containers) > > > when > > > > > >>the > > > > > >> > offsets change. > > > > > >> > > > > > > >> > Cheers, > > > > > >> > Chris > > > > > >> > > > > > > >> > On 1/16/15 2:13 PM, "Chinmay Soman" < > [email protected]> > > > > > wrote: > > > > > >> > > > > > > >> > >We don't have a way to start from a known offset right ? Do > > you > > > > guys > > > > > >> > >think > > > > > >> > >that might be useful ? > > > > > >> > > > > > > > >> > >On Fri, Jan 16, 2015 at 11:55 AM, Shekar Tippur < > > > [email protected] > > > > > > > > > > >> > wrote: > > > > > >> > > > > > > > >> > >> Yan, > > > > > >> > >> Thanks. This really helps. > > > > > >> > >> > > > > > >> > >> - Shekar > > > > > >> > >> > > > > > >> > >> On Fri, Jan 16, 2015 at 11:52 AM, Yan Fang < > > > [email protected] > > > > > > > > > > >> > wrote: > > > > > >> > >> > > > > > >> > >> > Hi Shekar, > > > > > >> > >> > > > > > > >> > >> > Assuming you are using 0.8, there are configurations that > > you > > > > can > > > > > >> set > > > > > >> > >>to > > > > > >> > >> > read messages from oldest. > > > > > >> > >> > > > > > > >> > >> > > systems.system-name.streams.stream-name.samza.reset.offset > > = > > > > true > > > > > >> > >> > > > systems.system-name.streams.stream-name.samza.offset.default > > > = > > > > > >> oldest > > > > > >> > >> > > > > > > >> > >> > See > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > http://samza.incubator.apache.org/learn/documentation/0.8/jobs/configurat > > > > > >> > >>ion-table.html > > > > > >> > >> > > > > > > >> > >> > Hope this helps. > > > > > >> > >> > > > > > > >> > >> > Thanks, > > > > > >> > >> > > > > > > >> > >> > Fang, Yan > > > > > >> > >> > [email protected] > > > > > >> > >> > +1 (206) 849-4108 > > > > > >> > >> > > > > > > >> > >> > On Fri, Jan 16, 2015 at 11:02 AM, Shekar Tippur > > > > > >><[email protected]> > > > > > >> > >> wrote: > > > > > >> > >> > > > > > > >> > >> > > Hello, > > > > > >> > >> > > > > > > > >> > >> > > I was wondering what would be the recommended way to > > point > > > > > >>Samza > > > > > >> to > > > > > >> > >> > consume > > > > > >> > >> > > older messages? > > > > > >> > >> > > > > > > > >> > >> > > I have made some code changes that I want to apply to > all > > > the > > > > > >> > >>messages > > > > > >> > >> > that > > > > > >> > >> > > have been read so far. > > > > > >> > >> > > > > > > > >> > >> > > - Shekar > > > > > >> > >> > > > > > > > >> > >> > > > > > > >> > >> > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > >-- > > > > > >> > >Thanks and regards > > > > > >> > > > > > > > >> > >Chinmay Soman > > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> -- > > > > > >> Thanks and regards > > > > > >> > > > > > >> Chinmay Soman > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > Thanks and regards > > > > > > Chinmay Soman > > > > > >
