Gotcha.  Regarding whether bootstrap=true should be default read from the 
oldest possible message on a topic, I would say it depends to some degree on 
what the behavior is in relation to the reliable state store (e.g. 
KeyValueStore).  Is a stream’s state store topic automatically considered a 
bootstrap stream without explicitly setting it as such in the config file?  If 
I use the reliable state store and I’m guaranteed that 1.) the state store will 
fully bootstrap before any other stream and 2.) afterward any other stream I’ve 
marked as a bootstrap will fully consume any outstanding messages since the 
last checkpoint, this seems reasonable to me.

Can you go into some detail about how these two areas of the config interact?  
Thanks!
—T

On Apr 11, 2014, at 10:30 AM, Chris Riccomini <[email protected]> wrote:

> Hey TJ,
> 
> Correct, you've got it.
> 
> * `reset.offset` tells Samza to ignore the check pointed offset
> * `offset.default` tells Samza what to do when there's no offset check
> pointed (because you've ignored it)
> * `bootstrap=true` tells Samza to prioritize the stream above all
> non-bootstrap streams, and to forcibly read all of its messages until the
> stream is caught up before allowing any other messages to be processed.
> 
> Reflecting on this now. It seems a bit complicated. We could have Samza
> set these automatically when bootstrap=true is set.
> 
> Cheers,
> Chris
> 
> On 4/11/14 10:11 AM, "TJ Giuli" <[email protected]> wrote:
> 
>> Hey, Chris, thanks for the help.  I’ve done a bit more playing around and
>> I’ve found that with my experimental setup I have to set *both*
>> offset.default=oldest and reset.offset=true to consistently bootstrap the
>> stream from the beginning.  Here are the config lines of importance:
>> 
>> systems.kafka.streams.wikipedia-raw.samza.bootstrap=true
>> systems.kafka.streams.wikipedia-raw.samza.reset.offset=true
>> systems.kafka.streams.wikipedia-raw.samza.offset.default=oldest
>> 
>> And here are the log outputs
>> 
>> Only offset.default=oldest: http://pastebin.com/yxnw2Ypu
>> This behaves as expected — my task had checkpointed and I was not filling
>> the wikipedia-raw topic with anything else, so the task did not read in
>> any messages.
>> 
>> Only reset.offset=true: http://pastebin.com/sw50RiYz
>> 2014-04-11 09:55:14 OffsetManager$ [DEBUG] Using default offset UPCOMING
>> for SystemStream [system=kafka, stream=wikipedia-raw].
>> 2014-04-11 09:55:14 OffsetManager$ [DEBUG] Using reset offset true for
>> SystemStream [system=kafka, stream=wikipedia-raw].
>> 
>> In this case, just setting reset.offset=true is not enough — the stream
>> still starts from UPCOMING and thus does not read any messages from
>> before the checkpoint.
>> 
>> Both: http://pastebin.com/WF2MQZcN
>> 2014-04-11 09:57:45 OffsetManager$ [DEBUG] Using default offset OLDEST
>> for SystemStream [system=kafka, stream=wikipedia-raw].
>> 2014-04-11 09:57:45 OffsetManager$ [DEBUG] Using reset offset true for
>> SystemStream [system=kafka, stream=wikipedia-raw].
>> 
>> This case both resets the offset back to 0 and consumes messages from the
>> 0th message in wikipedia-raw.  Is this the expected behavior?  Again,
>> thanks for the help!
>> —T
>> 
>> 
>> 
>> 
>> On Apr 10, 2014, at 2:01 PM, Chris Riccomini <[email protected]>
>> wrote:
>> 
>>> Hey TJ,
>>> 
>>> Samza has a setting called systems.%s.streams.%s.samza.offset.default.
>>> If
>>> undefined, it uses "upcoming", which means start from the most recent
>>> message in the topic. For a bootstrap stream, you probably want to set
>>> this to "oldest".
>>> 
>>> systems.kafka.wikipedia-raw.samza.offset.default=oldest
>>> 
>>> This will tell Samza to start from offset 0, not offset 42, in the case
>>> where there is no checkpoint available (i.e. When you start your job for
>>> the first time). Once there is a checkpoint (e.g. You've run your job
>>> for
>>> a period of time, and a checkpoint manager is configured), Samza will
>>> use
>>> the checkpoint. For example, if you've read to offset 55, and then
>>> restart, the SamzaContainer will begin bootstrapping from offset 55 (not
>>> offset 0). To force the bootstrapping to ALWAYS begin from offset 0, use
>>> this setting:
>>> 
>>> systems.kafka.wikipedia-raw.samza.reset.offset=true
>>> 
>>> 
>>> This will tell Samza to disregard the check pointed offset for the
>>> stream
>>> when it starts up. By forcing the offset to oldest, and disregarding
>>> offsets, you'll always re-process the entire bootstrap stream from
>>> offset
>>> 0. This is likely what you want.
>>> 
>>> This is undocumented right now. Sorry about that.
>>> 
>>> Cheers,
>>> Chris
>>> 
>>> On 4/10/14 11:19 AM, "TJ Giuli" <[email protected]> wrote:
>>> 
>>>> Hi, I¹m having some trouble getting bootstrapping to work.  To start
>>>> with
>>>> a simple example, I¹ve augmented hello-samza to get the Wikipedia feed
>>>> parser to bootstrap from the wikipedia-raw topic.  When I peek into the
>>>> logs, I see the following lines:
>>>> 
>>>> 2014-04-10 11:09:46 KafkaSystemAdmin$ [INFO] Got metadata:
>>>> Map(wikipedia-raw -> SystemStreamMetadata [streamName=wikipedia-raw,
>>>> partitionMetadata={Partition
>>>> [partition=0]=SystemStreamPartitionMetadata
>>>> [oldestOffset=0, newestOffset=41, upcomingOffset=42]}])
>>>> 
>>>> 2014-04-10 11:09:46 DefaultChooser [INFO] Building default chooser
>>>> with:
>>>> useBatching=false, useBootstrapping=true, usePriority=true
>>>> 
>>>> 2014-04-10 11:09:46 BootstrappingChooser [INFO] Bootstrap stream is
>>>> fully
>>>> caught up: SystemStream [system=kafka, stream=wikipedia-raw]
>>>> 
>>>> 2014-04-10 11:09:46 GetOffset [INFO] Validating offset 42 for topic and
>>>> partition [wikipedia-raw,0]
>>>> 2014-04-10 11:09:46 GetOffset [INFO] Able to successfully read from
>>>> offset 42 for topic and partition [wikipedia-raw,0]. Using it to
>>>> instantiate consumer.
>>>> 
>>>> So it seems that Samza recognizes that I want to bootstrap
>>>> wikipedia-raw,
>>>> but as the stream processor is finishing initialization, it seems to
>>>> think that wikipedia-raw has been fully bootstrapped and so it starts
>>>> from offset 42.  Any ideas?  My config is below, full logs on pastebin
>>>> (http://pastebin.com/ZatAA0Ak)
>>>> 
>>>> # Job          
>>>> 
>>>> 
>>>> 
>>>> 
>>>> job.factory.class=org.apache.samza.job.local.LocalJobFactory
>>>> job.name=wikipedia-parser
>>>> 
>>>> # Task         
>>>> 
>>>> 
>>>> 
>>>> 
>>>> task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask
>>>> task.inputs=kafka.wikipedia-raw
>>>> 
>>>> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpoin
>>>> tM
>>>> anagerFactory
>>>> task.checkpoint.system=kafka
>>>> # Normally, this would be 3, but we have only one broker.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> task.checkpoint.replication.factor=1
>>>> 
>>>> # Das boot     
>>>> 
>>>> 
>>>> 
>>>> 
>>>> systems.kafka.streams.wikipedia-raw.samza.bootstrap=true
>>>> systems.kafka.streams.wikipedia-raw.samza.reset.offset=true
>>>> 
>>>> # Metrics      
>>>> 
>>>> 
>>>> 
>>>> 
>>>> metrics.reporters=snapshot,jmx
>>>> 
>>>> metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Metric
>>>> sS
>>>> napshotReporterFactory
>>>> metrics.reporter.snapshot.stream=kafka.metrics
>>>> 
>>>> metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporter
>>>> Fa
>>>> ctory
>>>> 
>>>> # Serializers  
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFa
>>>> ct
>>>> ory
>>>> 
>>>> serializers.registry.metrics.class=org.apache.samza.serializers.MetricsS
>>>> na
>>>> pshotSerdeFactory
>>>> 
>>>> # Systems      
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFac
>>>> to
>>>> ry
>>>> systems.kafka.samza.msg.serde=json
>>>> systems.kafka.consumer.zookeeper.connect=localhost:2181/
>>>> #systems.kafka.consumer.auto.offset.reset=smallest
>>>> 
>>>> 
>>>> 
>>>> 
>>>> systems.kafka.producer.metadata.broker.list=localhost:9092
>>>> systems.kafka.producer.producer.type=sync
>>>> # Normally, we'd set this much higher, but we want things to look
>>>> snappy
>>>> in the demo.   
>>>> 
>>>> 
>>>> 
>>>> systems.kafka.producer.batch.num.messages=1
>>>> systems.kafka.streams.metrics.samza.msg.serde=metrics
>>>> 
>>>> 
>>>> 
>>>> Thanks!
>>>> ‹T
>>> 
>> 
> 

Reply via email to