Sweet ! Glad to see it working.

On Sat, Jan 17, 2015 at 5:16 PM, Shekar Tippur <[email protected]> wrote:

> Chinmay,
>
> My bad. You were right on. I had changed one of them but not both.
>
> It is now consuming all data.
>
> Thanks a lot.
>
> - Shekar
>
> On Sat, Jan 17, 2015 at 2:41 PM, Chinmay Soman <[email protected]>
> wrote:
>
> > Stupid question (and I probably already know the answer) but I wanted to
> be
> > extra sure. Im assuming you've replaced the "system-name" and
> "stream-name"
> > with the actual terms (Kafka and whatever the topic name is) ?
> > On Jan 17, 2015 2:31 PM, "Shekar Tippur" <[email protected]> wrote:
> >
> > > Chinmay,
> > >
> > > I see this on the logs:
> > >
> > > 2015-01-17 14:24:40 SamzaContainer$ [INFO] Using configuration:
> > > {system.redis=10.132.67.120,
> > >
> > >
> >
> metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory,
> > > *systems.system-name.streams.stream-name.samza.offset.default=oldest*,
> > > task.drop.deserialization.errors=true,
> > >
> > >
> >
> serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory,
> > > task.drop.serialization.errors=true,
> > > systems.kafka.streams.metrics.samza.msg.serde=metrics,
> > > task.class=samza.examples.wikipedia.task.ArgosParserStreamTask,
> > >
> > >
> >
> yarn.package.path=file:///home/ctippur/hello-samza/samza-job-package/target/samza-job-package-0.7.0-dist.tar.gz,
> > > metrics.reporter.snapshot.stream=kafka.metrics,
> > >
> > >
> >
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory,
> > > *systems.system-name.streams.stream-name.samza.reset.offset=true*,
> > > task.inputs=kafka.argos-raw,
> > >
> > >
> >
> metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory,
> > >
> > >
> >
> serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory,
> > > job.factory.class=org.apache.samza.job.local.ThreadJobFactory,
> > > systems.kafka.producer.batch.num.messages=1,
> > > task.checkpoint.replication.factor=1,
> > > systems.kafka.producer.producer.type=sync, job.name=argos-parser,
> > > systems.kafka.samza.msg.serde=json,
> > > systems.kafka.consumer.auto.offset.reset=largest,
> > > systems.kafka.consumer.zookeeper.connect=localhost:2181/,
> > >
> > >
> >
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory,
> > > task.ignored.exceptions=org.codehaus.jackson.JsonParseException,
> > > metrics.reporters=snapshot,jmx, task.checkpoint.system=kafka,
> > > systems.kafka.producer.metadata.broker.list=localhost:9092}
> > >
> > > Indicating that it is reading the property. I dont see the job reading
> > from
> > > the beginning.
> > >
> > > On Fri, Jan 16, 2015 at 9:42 PM, Chinmay Soman <
> > [email protected]>
> > > wrote:
> > >
> > > > It looks like it is still referring to the offset received from the
> > > > Checkpoint manager. Can you double check your config for typos ?
> > > >
> > > > On Fri, Jan 16, 2015 at 9:35 PM, Shekar Tippur <[email protected]>
> > > wrote:
> > > >
> > > > > Chris -
> > > > >
> > > > > Here are the logs:
> > > > >
> > > > > 2015-01-16 17:30:15 KafkaCheckpointManager [INFO] Got checkpoint
> > state
> > > > for
> > > > > taskName Partition 0: Checkpoint [offsets={SystemStreamPartition
> > > [kafka,
> > > > > argos-raw, 0]=60958}]
> > > > >
> > > > > 2015-01-16 17:30:15 OffsetManager [INFO] Successfully loaded last
> > > > processed
> > > > > offsets: Map(SystemStreamPartition [kafka, argos-raw, 0] -> 60958)
> > > > >
> > > > > 2015-01-16 17:30:15 OffsetManager [INFO] Successfully loaded
> starting
> > > > > offsets: Map(SystemStreamPartition [kafka, argos-raw, 0] -> 60959)
> > > > >
> > > > > 2015-01-16 17:30:15 SamzaContainer [INFO] Starting task instance
> > > stores.
> > > > >
> > > > > 2015-01-16 17:30:15 SamzaContainer [INFO] Initializing stream
> tasks.
> > > > >
> > > > > 2015-01-16 17:30:15 SamzaContainer [INFO] Registering task
> instances
> > > with
> > > > > producers.
> > > > >
> > > > > 2015-01-16 17:30:15 SamzaContainer [INFO] Starting producer
> > > multiplexer.
> > > > >
> > > > > 2015-01-16 17:30:15 SamzaContainer [INFO] Registering task
> instances
> > > with
> > > > > consumers.
> > > > >
> > > > > 2015-01-16 17:30:15 SamzaContainer [INFO] Starting consumer
> > > multiplexer.
> > > > >
> > > > > 2015-01-16 17:30:15 VerifiableProperties [INFO] Verifying
> properties
> > > > >
> > > > > 2015-01-16 17:30:15 VerifiableProperties [INFO] Property client.id
> > is
> > > > > overridden to samza_consumer-argos_parser-1-1421458212216-3
> > > > >
> > > > > 2015-01-16 17:30:15 VerifiableProperties [INFO] Property
> > > > > metadata.broker.list is overridden to localhost:9092
> > > > >
> > > > > 2015-01-16 17:30:15 VerifiableProperties [INFO] Property
> > > > > request.timeout.ms
> > > > > is overridden to 30000
> > > > >
> > > > > 2015-01-16 17:30:15 ClientUtils$ [INFO] Fetching metadata from
> broker
> > > > > id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s)
> > > > > Set(argos-raw)
> > > > >
> > > > > 2015-01-16 17:30:15 SyncProducer [INFO] Connected to localhost:9092
> > for
> > > > > producing
> > > > >
> > > > > 2015-01-16 17:30:15 SyncProducer [INFO] Disconnecting from
> > > localhost:9092
> > > > >
> > > > > 2015-01-16 17:30:15 BrokerProxy [INFO] Creating new SimpleConsumer
> > for
> > > > host
> > > > > pppdc9prd2yv.corp.net:9092 for system kafka
> > > > >
> > > > > 2015-01-16 17:30:15 GetOffset [INFO] Validating offset 60959 for
> > topic
> > > > and
> > > > > partition [argos-raw,0]
> > > > >
> > > > > 2015-01-16 17:30:15 GetOffset [INFO] Able to successfully read from
> > > > offset
> > > > > 60959 for topic and partition [argos-raw,0]. Using it to
> instantiate
> > > > > consumer.
> > > > >
> > > > > 2015-01-16 17:30:15 BrokerProxy [INFO] Starting BrokerProxy for
> > > > > pppdc9prd2yv.corp.net:9092
> > > > >
> > > > > 2015-01-16 17:30:15 SamzaContainer [INFO] Entering run loop.
> > > > >
> > > > > 2015-01-16 17:30:15 KafkaSystemProducer [INFO] Creating a new
> > producer
> > > > for
> > > > > system kafka.
> > > > >
> > > > > 2015-01-16 17:30:15 ClientUtils$ [INFO] Fetching metadata from
> broker
> > > > > id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s)
> > > > > Set(__samza_checkpoint_ver_1_for_argos-parser_1)
> > > > >
> > > > > 2015-01-16 17:30:15 SyncProducer [INFO] Connected to localhost:9092
> > for
> > > > > producing
> > > > >
> > > > > 2015-01-16 17:30:15 SyncProducer [INFO] Disconnecting from
> > > localhost:9092
> > > > >
> > > > > 2015-01-16 17:30:15 SyncProducer [INFO] Connected to
> > > > > pppdc9prd2yv.corp.net:9092 for producing
> > > > >
> > > > > On Fri, Jan 16, 2015 at 6:03 PM, Chris Riccomini <
> > > > > [email protected]> wrote:
> > > > >
> > > > > > Hey Shekar,
> > > > > >
> > > > > > Hmm. Could you post your logs, so we can have a look? The logs
> will
> > > > post
> > > > > > both the config, and the offsets used. If there's an issue, we
> > should
> > > > be
> > > > > > able to figure it out.
> > > > > >
> > > > > > Cheers,
> > > > > > Chris
> > > > > >
> > > > > > On 1/16/15 5:41 PM, "Shekar Tippur" <[email protected]> wrote:
> > > > > >
> > > > > > >Interesting ..
> > > > > > >
> > > > > > >
> > > > > > >I have added
> > > > > > >
> > > > > > >systems.system-name.streams.stream-name.samza.reset.offset =
> true
> > > > > > >
> > > > > > >systems.system-name.streams.stream-name.samza.offset.default =
> > > oldest
> > > > > > >
> > > > > > >to the config. I see that it is still looking only the latest
> > > events.
> > > > > > >
> > > > > > >
> > > > > > >- Shekar
> > > > > > >
> > > > > > >On Fri, Jan 16, 2015 at 3:26 PM, Chinmay Soman <
> > > > > [email protected]
> > > > > > >
> > > > > > >wrote:
> > > > > > >
> > > > > > >> Aah that's right. Thanks for confirming.
> > > > > > >>
> > > > > > >> On Fri, Jan 16, 2015 at 3:22 PM, Chris Riccomini <
> > > > > > >> [email protected]> wrote:
> > > > > > >>
> > > > > > >> > Hey Chinmay,
> > > > > > >> >
> > > > > > >> > We do have checkpoint-tool.sh, which allows you to set input
> > > > offsets
> > > > > > >>to
> > > > > > >> > arbitrary locations. One caveat here is that your job has to
> > be
> > > > > > >>offline
> > > > > > >> > when this is done. When coordinator stream is done, it
> should
> > be
> > > > > > >>possible
> > > > > > >> > to dynamically change the offsets (by bouncing the
> containers)
> > > > when
> > > > > > >>the
> > > > > > >> > offsets change.
> > > > > > >> >
> > > > > > >> > Cheers,
> > > > > > >> > Chris
> > > > > > >> >
> > > > > > >> > On 1/16/15 2:13 PM, "Chinmay Soman" <
> > [email protected]>
> > > > > > wrote:
> > > > > > >> >
> > > > > > >> > >We don't have a way to start from a known offset right ?
> Do
> > > you
> > > > > guys
> > > > > > >> > >think
> > > > > > >> > >that might be useful ?
> > > > > > >> > >
> > > > > > >> > >On Fri, Jan 16, 2015 at 11:55 AM, Shekar Tippur <
> > > > [email protected]
> > > > > >
> > > > > > >> > wrote:
> > > > > > >> > >
> > > > > > >> > >> Yan,
> > > > > > >> > >> Thanks. This really helps.
> > > > > > >> > >>
> > > > > > >> > >> - Shekar
> > > > > > >> > >>
> > > > > > >> > >> On Fri, Jan 16, 2015 at 11:52 AM, Yan Fang <
> > > > [email protected]
> > > > > >
> > > > > > >> > wrote:
> > > > > > >> > >>
> > > > > > >> > >> > Hi Shekar,
> > > > > > >> > >> >
> > > > > > >> > >> > Assuming you are using 0.8, there are configurations
> that
> > > you
> > > > > can
> > > > > > >> set
> > > > > > >> > >>to
> > > > > > >> > >> > read messages from oldest.
> > > > > > >> > >> >
> > > > > > >> > >> >
> > systems.system-name.streams.stream-name.samza.reset.offset
> > > =
> > > > > true
> > > > > > >> > >> >
> > > systems.system-name.streams.stream-name.samza.offset.default
> > > > =
> > > > > > >> oldest
> > > > > > >> > >> >
> > > > > > >> > >> > See
> > > > > > >> > >> >
> > > > > > >> > >> >
> > > > > > >> > >>
> > > > > > >> > >>
> > > > > > >> >
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> http://samza.incubator.apache.org/learn/documentation/0.8/jobs/configurat
> > > > > > >> > >>ion-table.html
> > > > > > >> > >> >
> > > > > > >> > >> > Hope this helps.
> > > > > > >> > >> >
> > > > > > >> > >> > Thanks,
> > > > > > >> > >> >
> > > > > > >> > >> > Fang, Yan
> > > > > > >> > >> > [email protected]
> > > > > > >> > >> > +1 (206) 849-4108
> > > > > > >> > >> >
> > > > > > >> > >> > On Fri, Jan 16, 2015 at 11:02 AM, Shekar Tippur
> > > > > > >><[email protected]>
> > > > > > >> > >> wrote:
> > > > > > >> > >> >
> > > > > > >> > >> > > Hello,
> > > > > > >> > >> > >
> > > > > > >> > >> > > I was wondering what would be the recommended way to
> > > point
> > > > > > >>Samza
> > > > > > >> to
> > > > > > >> > >> > consume
> > > > > > >> > >> > > older messages?
> > > > > > >> > >> > >
> > > > > > >> > >> > > I have made some code changes that I want to apply to
> > all
> > > > the
> > > > > > >> > >>messages
> > > > > > >> > >> > that
> > > > > > >> > >> > > have been read so far.
> > > > > > >> > >> > >
> > > > > > >> > >> > > - Shekar
> > > > > > >> > >> > >
> > > > > > >> > >> >
> > > > > > >> > >>
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > >--
> > > > > > >> > >Thanks and regards
> > > > > > >> > >
> > > > > > >> > >Chinmay Soman
> > > > > > >> >
> > > > > > >> >
> > > > > > >>
> > > > > > >>
> > > > > > >> --
> > > > > > >> Thanks and regards
> > > > > > >>
> > > > > > >> Chinmay Soman
> > > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Thanks and regards
> > > >
> > > > Chinmay Soman
> > > >
> > >
> >
>



-- 
Thanks and regards

Chinmay Soman

Reply via email to