Hey, Chris, thanks for the help.  I’ve done a bit more playing around and I’ve 
found that with my experimental setup I have to set *both* 
offset.default=oldest and reset.offset=true to consistently bootstrap the 
stream from the beginning.  Here are the config lines of importance:

systems.kafka.streams.wikipedia-raw.samza.bootstrap=true
systems.kafka.streams.wikipedia-raw.samza.reset.offset=true
systems.kafka.streams.wikipedia-raw.samza.offset.default=oldest

And here are the log outputs

Only offset.default=oldest: http://pastebin.com/yxnw2Ypu
This behaves as expected — my task had checkpointed and I was not filling the 
wikipedia-raw topic with anything else, so the task did not read in any 
messages.

Only reset.offset=true: http://pastebin.com/sw50RiYz
2014-04-11 09:55:14 OffsetManager$ [DEBUG] Using default offset UPCOMING for 
SystemStream [system=kafka, stream=wikipedia-raw].
2014-04-11 09:55:14 OffsetManager$ [DEBUG] Using reset offset true for 
SystemStream [system=kafka, stream=wikipedia-raw].

In this case, just setting reset.offset=true is not enough — the stream still 
starts from UPCOMING and thus does not read any messages from before the 
checkpoint.

Both: http://pastebin.com/WF2MQZcN
2014-04-11 09:57:45 OffsetManager$ [DEBUG] Using default offset OLDEST for 
SystemStream [system=kafka, stream=wikipedia-raw].
2014-04-11 09:57:45 OffsetManager$ [DEBUG] Using reset offset true for 
SystemStream [system=kafka, stream=wikipedia-raw].

This case both resets the offset back to 0 and consumes messages from the 0th 
message in wikipedia-raw.  Is this the expected behavior?  Again, thanks for 
the help!
—T




On Apr 10, 2014, at 2:01 PM, Chris Riccomini <[email protected]> wrote:

> Hey TJ,
> 
> Samza has a setting called systems.%s.streams.%s.samza.offset.default. If
> undefined, it uses "upcoming", which means start from the most recent
> message in the topic. For a bootstrap stream, you probably want to set
> this to "oldest".
> 
>  systems.kafka.wikipedia-raw.samza.offset.default=oldest
> 
> This will tell Samza to start from offset 0, not offset 42, in the case
> where there is no checkpoint available (i.e. When you start your job for
> the first time). Once there is a checkpoint (e.g. You've run your job for
> a period of time, and a checkpoint manager is configured), Samza will use
> the checkpoint. For example, if you've read to offset 55, and then
> restart, the SamzaContainer will begin bootstrapping from offset 55 (not
> offset 0). To force the bootstrapping to ALWAYS begin from offset 0, use
> this setting:
> 
>  systems.kafka.wikipedia-raw.samza.reset.offset=true
> 
> 
> This will tell Samza to disregard the check pointed offset for the stream
> when it starts up. By forcing the offset to oldest, and disregarding
> offsets, you'll always re-process the entire bootstrap stream from offset
> 0. This is likely what you want.
> 
> This is undocumented right now. Sorry about that.
> 
> Cheers,
> Chris
> 
> On 4/10/14 11:19 AM, "TJ Giuli" <[email protected]> wrote:
> 
>> Hi, I¹m having some trouble getting bootstrapping to work.  To start with
>> a simple example, I¹ve augmented hello-samza to get the Wikipedia feed
>> parser to bootstrap from the wikipedia-raw topic.  When I peek into the
>> logs, I see the following lines:
>> 
>> 2014-04-10 11:09:46 KafkaSystemAdmin$ [INFO] Got metadata:
>> Map(wikipedia-raw -> SystemStreamMetadata [streamName=wikipedia-raw,
>> partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata
>> [oldestOffset=0, newestOffset=41, upcomingOffset=42]}])
>> 
>> 2014-04-10 11:09:46 DefaultChooser [INFO] Building default chooser with:
>> useBatching=false, useBootstrapping=true, usePriority=true
>> 
>> 2014-04-10 11:09:46 BootstrappingChooser [INFO] Bootstrap stream is fully
>> caught up: SystemStream [system=kafka, stream=wikipedia-raw]
>> 
>> 2014-04-10 11:09:46 GetOffset [INFO] Validating offset 42 for topic and
>> partition [wikipedia-raw,0]
>> 2014-04-10 11:09:46 GetOffset [INFO] Able to successfully read from
>> offset 42 for topic and partition [wikipedia-raw,0]. Using it to
>> instantiate consumer.
>> 
>> So it seems that Samza recognizes that I want to bootstrap wikipedia-raw,
>> but as the stream processor is finishing initialization, it seems to
>> think that wikipedia-raw has been fully bootstrapped and so it starts
>> from offset 42.  Any ideas?  My config is below, full logs on pastebin
>> (http://pastebin.com/ZatAA0Ak)
>> 
>> # Job             
>> 
>> 
>> 
>> 
>> job.factory.class=org.apache.samza.job.local.LocalJobFactory
>> job.name=wikipedia-parser
>> 
>> # Task            
>> 
>> 
>> 
>> 
>> task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask
>> task.inputs=kafka.wikipedia-raw
>> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointM
>> anagerFactory
>> task.checkpoint.system=kafka
>> # Normally, this would be 3, but we have only one broker.
>> 
>> 
>> 
>> 
>> task.checkpoint.replication.factor=1
>> 
>> # Das boot        
>> 
>> 
>> 
>> 
>> systems.kafka.streams.wikipedia-raw.samza.bootstrap=true
>> systems.kafka.streams.wikipedia-raw.samza.reset.offset=true
>> 
>> # Metrics         
>> 
>> 
>> 
>> 
>> metrics.reporters=snapshot,jmx
>> metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsS
>> napshotReporterFactory
>> metrics.reporter.snapshot.stream=kafka.metrics
>> metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFa
>> ctory
>> 
>> # Serializers     
>> 
>> 
>> 
>> 
>> serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFact
>> ory
>> serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSna
>> pshotSerdeFactory
>> 
>> # Systems         
>> 
>> 
>> 
>> 
>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFacto
>> ry
>> systems.kafka.samza.msg.serde=json
>> systems.kafka.consumer.zookeeper.connect=localhost:2181/
>> #systems.kafka.consumer.auto.offset.reset=smallest
>> 
>> 
>> 
>> 
>> systems.kafka.producer.metadata.broker.list=localhost:9092
>> systems.kafka.producer.producer.type=sync
>> # Normally, we'd set this much higher, but we want things to look snappy
>> in the demo.      
>> 
>> 
>> 
>> systems.kafka.producer.batch.num.messages=1
>> systems.kafka.streams.metrics.samza.msg.serde=metrics
>> 
>> 
>> 
>> Thanks!
>> ‹T
> 

Reply via email to