Hey TJ,

Correct, you've got it.

* `reset.offset` tells Samza to ignore the check pointed offset
* `offset.default` tells Samza what to do when there's no offset check
pointed (because you've ignored it)
* `bootstrap=true` tells Samza to prioritize the stream above all
non-bootstrap streams, and to forcibly read all of its messages until the
stream is caught up before allowing any other messages to be processed.

Reflecting on this now. It seems a bit complicated. We could have Samza
set these automatically when bootstrap=true is set.

Cheers,
Chris

On 4/11/14 10:11 AM, "TJ Giuli" <[email protected]> wrote:

>Hey, Chris, thanks for the help.  I’ve done a bit more playing around and
>I’ve found that with my experimental setup I have to set *both*
>offset.default=oldest and reset.offset=true to consistently bootstrap the
>stream from the beginning.  Here are the config lines of importance:
>
>systems.kafka.streams.wikipedia-raw.samza.bootstrap=true
>systems.kafka.streams.wikipedia-raw.samza.reset.offset=true
>systems.kafka.streams.wikipedia-raw.samza.offset.default=oldest
>
>And here are the log outputs
>
>Only offset.default=oldest: http://pastebin.com/yxnw2Ypu
>This behaves as expected — my task had checkpointed and I was not filling
>the wikipedia-raw topic with anything else, so the task did not read in
>any messages.
>
>Only reset.offset=true: http://pastebin.com/sw50RiYz
>2014-04-11 09:55:14 OffsetManager$ [DEBUG] Using default offset UPCOMING
>for SystemStream [system=kafka, stream=wikipedia-raw].
>2014-04-11 09:55:14 OffsetManager$ [DEBUG] Using reset offset true for
>SystemStream [system=kafka, stream=wikipedia-raw].
>
>In this case, just setting reset.offset=true is not enough — the stream
>still starts from UPCOMING and thus does not read any messages from
>before the checkpoint.
>
>Both: http://pastebin.com/WF2MQZcN
>2014-04-11 09:57:45 OffsetManager$ [DEBUG] Using default offset OLDEST
>for SystemStream [system=kafka, stream=wikipedia-raw].
>2014-04-11 09:57:45 OffsetManager$ [DEBUG] Using reset offset true for
>SystemStream [system=kafka, stream=wikipedia-raw].
>
>This case both resets the offset back to 0 and consumes messages from the
>0th message in wikipedia-raw.  Is this the expected behavior?  Again,
>thanks for the help!
>—T
>
>
>
>
>On Apr 10, 2014, at 2:01 PM, Chris Riccomini <[email protected]>
>wrote:
>
>> Hey TJ,
>> 
>> Samza has a setting called systems.%s.streams.%s.samza.offset.default.
>>If
>> undefined, it uses "upcoming", which means start from the most recent
>> message in the topic. For a bootstrap stream, you probably want to set
>> this to "oldest".
>> 
>>  systems.kafka.wikipedia-raw.samza.offset.default=oldest
>> 
>> This will tell Samza to start from offset 0, not offset 42, in the case
>> where there is no checkpoint available (i.e. When you start your job for
>> the first time). Once there is a checkpoint (e.g. You've run your job
>>for
>> a period of time, and a checkpoint manager is configured), Samza will
>>use
>> the checkpoint. For example, if you've read to offset 55, and then
>> restart, the SamzaContainer will begin bootstrapping from offset 55 (not
>> offset 0). To force the bootstrapping to ALWAYS begin from offset 0, use
>> this setting:
>> 
>>  systems.kafka.wikipedia-raw.samza.reset.offset=true
>> 
>> 
>> This will tell Samza to disregard the check pointed offset for the
>>stream
>> when it starts up. By forcing the offset to oldest, and disregarding
>> offsets, you'll always re-process the entire bootstrap stream from
>>offset
>> 0. This is likely what you want.
>> 
>> This is undocumented right now. Sorry about that.
>> 
>> Cheers,
>> Chris
>> 
>> On 4/10/14 11:19 AM, "TJ Giuli" <[email protected]> wrote:
>> 
>>> Hi, I¹m having some trouble getting bootstrapping to work.  To start
>>>with
>>> a simple example, I¹ve augmented hello-samza to get the Wikipedia feed
>>> parser to bootstrap from the wikipedia-raw topic.  When I peek into the
>>> logs, I see the following lines:
>>> 
>>> 2014-04-10 11:09:46 KafkaSystemAdmin$ [INFO] Got metadata:
>>> Map(wikipedia-raw -> SystemStreamMetadata [streamName=wikipedia-raw,
>>> partitionMetadata={Partition
>>>[partition=0]=SystemStreamPartitionMetadata
>>> [oldestOffset=0, newestOffset=41, upcomingOffset=42]}])
>>> 
>>> 2014-04-10 11:09:46 DefaultChooser [INFO] Building default chooser
>>>with:
>>> useBatching=false, useBootstrapping=true, usePriority=true
>>> 
>>> 2014-04-10 11:09:46 BootstrappingChooser [INFO] Bootstrap stream is
>>>fully
>>> caught up: SystemStream [system=kafka, stream=wikipedia-raw]
>>> 
>>> 2014-04-10 11:09:46 GetOffset [INFO] Validating offset 42 for topic and
>>> partition [wikipedia-raw,0]
>>> 2014-04-10 11:09:46 GetOffset [INFO] Able to successfully read from
>>> offset 42 for topic and partition [wikipedia-raw,0]. Using it to
>>> instantiate consumer.
>>> 
>>> So it seems that Samza recognizes that I want to bootstrap
>>>wikipedia-raw,
>>> but as the stream processor is finishing initialization, it seems to
>>> think that wikipedia-raw has been fully bootstrapped and so it starts
>>> from offset 42.  Any ideas?  My config is below, full logs on pastebin
>>> (http://pastebin.com/ZatAA0Ak)
>>> 
>>> # Job          
>>> 
>>> 
>>> 
>>> 
>>> job.factory.class=org.apache.samza.job.local.LocalJobFactory
>>> job.name=wikipedia-parser
>>> 
>>> # Task         
>>> 
>>> 
>>> 
>>> 
>>> task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask
>>> task.inputs=kafka.wikipedia-raw
>>> 
>>>task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpoin
>>>tM
>>> anagerFactory
>>> task.checkpoint.system=kafka
>>> # Normally, this would be 3, but we have only one broker.
>>> 
>>> 
>>> 
>>> 
>>> task.checkpoint.replication.factor=1
>>> 
>>> # Das boot     
>>> 
>>> 
>>> 
>>> 
>>> systems.kafka.streams.wikipedia-raw.samza.bootstrap=true
>>> systems.kafka.streams.wikipedia-raw.samza.reset.offset=true
>>> 
>>> # Metrics      
>>> 
>>> 
>>> 
>>> 
>>> metrics.reporters=snapshot,jmx
>>> 
>>>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Metric
>>>sS
>>> napshotReporterFactory
>>> metrics.reporter.snapshot.stream=kafka.metrics
>>> 
>>>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporter
>>>Fa
>>> ctory
>>> 
>>> # Serializers  
>>> 
>>> 
>>> 
>>> 
>>> 
>>>serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFa
>>>ct
>>> ory
>>> 
>>>serializers.registry.metrics.class=org.apache.samza.serializers.MetricsS
>>>na
>>> pshotSerdeFactory
>>> 
>>> # Systems      
>>> 
>>> 
>>> 
>>> 
>>> 
>>>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFac
>>>to
>>> ry
>>> systems.kafka.samza.msg.serde=json
>>> systems.kafka.consumer.zookeeper.connect=localhost:2181/
>>> #systems.kafka.consumer.auto.offset.reset=smallest
>>> 
>>> 
>>> 
>>> 
>>> systems.kafka.producer.metadata.broker.list=localhost:9092
>>> systems.kafka.producer.producer.type=sync
>>> # Normally, we'd set this much higher, but we want things to look
>>>snappy
>>> in the demo.   
>>> 
>>> 
>>> 
>>> systems.kafka.producer.batch.num.messages=1
>>> systems.kafka.streams.metrics.samza.msg.serde=metrics
>>> 
>>> 
>>> 
>>> Thanks!
>>> ‹T
>> 
>

Reply via email to