Hey TJ,

Samza has a setting called systems.%s.streams.%s.samza.offset.default. If
undefined, it uses "upcoming", which means start from the most recent
message in the topic. For a bootstrap stream, you probably want to set
this to "oldest".

  systems.kafka.wikipedia-raw.samza.offset.default=oldest

This will tell Samza to start from offset 0, not offset 42, in the case
where there is no checkpoint available (i.e. When you start your job for
the first time). Once there is a checkpoint (e.g. You've run your job for
a period of time, and a checkpoint manager is configured), Samza will use
the checkpoint. For example, if you've read to offset 55, and then
restart, the SamzaContainer will begin bootstrapping from offset 55 (not
offset 0). To force the bootstrapping to ALWAYS begin from offset 0, use
this setting:

  systems.kafka.wikipedia-raw.samza.reset.offset=true


This will tell Samza to disregard the check pointed offset for the stream
when it starts up. By forcing the offset to oldest, and disregarding
offsets, you'll always re-process the entire bootstrap stream from offset
0. This is likely what you want.

This is undocumented right now. Sorry about that.

Cheers,
Chris

On 4/10/14 11:19 AM, "TJ Giuli" <[email protected]> wrote:

>Hi, I¹m having some trouble getting bootstrapping to work.  To start with
>a simple example, I¹ve augmented hello-samza to get the Wikipedia feed
>parser to bootstrap from the wikipedia-raw topic.  When I peek into the
>logs, I see the following lines:
>
>2014-04-10 11:09:46 KafkaSystemAdmin$ [INFO] Got metadata:
>Map(wikipedia-raw -> SystemStreamMetadata [streamName=wikipedia-raw,
>partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata
>[oldestOffset=0, newestOffset=41, upcomingOffset=42]}])
>
>2014-04-10 11:09:46 DefaultChooser [INFO] Building default chooser with:
>useBatching=false, useBootstrapping=true, usePriority=true
>
>2014-04-10 11:09:46 BootstrappingChooser [INFO] Bootstrap stream is fully
>caught up: SystemStream [system=kafka, stream=wikipedia-raw]
>
>2014-04-10 11:09:46 GetOffset [INFO] Validating offset 42 for topic and
>partition [wikipedia-raw,0]
>2014-04-10 11:09:46 GetOffset [INFO] Able to successfully read from
>offset 42 for topic and partition [wikipedia-raw,0]. Using it to
>instantiate consumer.
>
>So it seems that Samza recognizes that I want to bootstrap wikipedia-raw,
>but as the stream processor is finishing initialization, it seems to
>think that wikipedia-raw has been fully bootstrapped and so it starts
>from offset 42.  Any ideas?  My config is below, full logs on pastebin
>(http://pastebin.com/ZatAA0Ak)
>
># Job             
>                  
>                  
>                  
>                  
>job.factory.class=org.apache.samza.job.local.LocalJobFactory
>job.name=wikipedia-parser
>
># Task            
>                  
>                  
>                  
>                  
>task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask
>task.inputs=kafka.wikipedia-raw
>task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointM
>anagerFactory
>task.checkpoint.system=kafka
># Normally, this would be 3, but we have only one broker.
>                  
>                  
>                  
>                  
>task.checkpoint.replication.factor=1
>
># Das boot        
>                  
>                  
>                  
>                  
>systems.kafka.streams.wikipedia-raw.samza.bootstrap=true
>systems.kafka.streams.wikipedia-raw.samza.reset.offset=true
>
># Metrics         
>                  
>                  
>                  
>                  
>metrics.reporters=snapshot,jmx
>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsS
>napshotReporterFactory
>metrics.reporter.snapshot.stream=kafka.metrics
>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFa
>ctory
>
># Serializers     
>                  
>                  
>                  
>                  
>serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFact
>ory
>serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSna
>pshotSerdeFactory
>
># Systems         
>                  
>                  
>                  
>                  
>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFacto
>ry
>systems.kafka.samza.msg.serde=json
>systems.kafka.consumer.zookeeper.connect=localhost:2181/
>#systems.kafka.consumer.auto.offset.reset=smallest
>                  
>                  
>                  
>                  
>systems.kafka.producer.metadata.broker.list=localhost:9092
>systems.kafka.producer.producer.type=sync
># Normally, we'd set this much higher, but we want things to look snappy
>in the demo.      
>                  
>                  
>                  
>systems.kafka.producer.batch.num.messages=1
>systems.kafka.streams.metrics.samza.msg.serde=metrics
>
>
>
>Thanks!
>‹T

Reply via email to