Hey TJ, Correct, you've got it.
* `reset.offset` tells Samza to ignore the check pointed offset * `offset.default` tells Samza what to do when there's no offset check pointed (because you've ignored it) * `bootstrap=true` tells Samza to prioritize the stream above all non-bootstrap streams, and to forcibly read all of its messages until the stream is caught up before allowing any other messages to be processed. Reflecting on this now. It seems a bit complicated. We could have Samza set these automatically when bootstrap=true is set. Cheers, Chris On 4/11/14 10:11 AM, "TJ Giuli" <[email protected]> wrote: >Hey, Chris, thanks for the help. I’ve done a bit more playing around and >I’ve found that with my experimental setup I have to set *both* >offset.default=oldest and reset.offset=true to consistently bootstrap the >stream from the beginning. Here are the config lines of importance: > >systems.kafka.streams.wikipedia-raw.samza.bootstrap=true >systems.kafka.streams.wikipedia-raw.samza.reset.offset=true >systems.kafka.streams.wikipedia-raw.samza.offset.default=oldest > >And here are the log outputs > >Only offset.default=oldest: http://pastebin.com/yxnw2Ypu >This behaves as expected — my task had checkpointed and I was not filling >the wikipedia-raw topic with anything else, so the task did not read in >any messages. > >Only reset.offset=true: http://pastebin.com/sw50RiYz >2014-04-11 09:55:14 OffsetManager$ [DEBUG] Using default offset UPCOMING >for SystemStream [system=kafka, stream=wikipedia-raw]. >2014-04-11 09:55:14 OffsetManager$ [DEBUG] Using reset offset true for >SystemStream [system=kafka, stream=wikipedia-raw]. > >In this case, just setting reset.offset=true is not enough — the stream >still starts from UPCOMING and thus does not read any messages from >before the checkpoint. > >Both: http://pastebin.com/WF2MQZcN >2014-04-11 09:57:45 OffsetManager$ [DEBUG] Using default offset OLDEST >for SystemStream [system=kafka, stream=wikipedia-raw]. >2014-04-11 09:57:45 OffsetManager$ [DEBUG] Using reset offset true for >SystemStream [system=kafka, stream=wikipedia-raw]. > >This case both resets the offset back to 0 and consumes messages from the >0th message in wikipedia-raw. Is this the expected behavior? Again, >thanks for the help! >—T > > > > >On Apr 10, 2014, at 2:01 PM, Chris Riccomini <[email protected]> >wrote: > >> Hey TJ, >> >> Samza has a setting called systems.%s.streams.%s.samza.offset.default. >>If >> undefined, it uses "upcoming", which means start from the most recent >> message in the topic. For a bootstrap stream, you probably want to set >> this to "oldest". >> >> systems.kafka.wikipedia-raw.samza.offset.default=oldest >> >> This will tell Samza to start from offset 0, not offset 42, in the case >> where there is no checkpoint available (i.e. When you start your job for >> the first time). Once there is a checkpoint (e.g. You've run your job >>for >> a period of time, and a checkpoint manager is configured), Samza will >>use >> the checkpoint. For example, if you've read to offset 55, and then >> restart, the SamzaContainer will begin bootstrapping from offset 55 (not >> offset 0). To force the bootstrapping to ALWAYS begin from offset 0, use >> this setting: >> >> systems.kafka.wikipedia-raw.samza.reset.offset=true >> >> >> This will tell Samza to disregard the check pointed offset for the >>stream >> when it starts up. By forcing the offset to oldest, and disregarding >> offsets, you'll always re-process the entire bootstrap stream from >>offset >> 0. This is likely what you want. >> >> This is undocumented right now. Sorry about that. >> >> Cheers, >> Chris >> >> On 4/10/14 11:19 AM, "TJ Giuli" <[email protected]> wrote: >> >>> Hi, I¹m having some trouble getting bootstrapping to work. To start >>>with >>> a simple example, I¹ve augmented hello-samza to get the Wikipedia feed >>> parser to bootstrap from the wikipedia-raw topic. When I peek into the >>> logs, I see the following lines: >>> >>> 2014-04-10 11:09:46 KafkaSystemAdmin$ [INFO] Got metadata: >>> Map(wikipedia-raw -> SystemStreamMetadata [streamName=wikipedia-raw, >>> partitionMetadata={Partition >>>[partition=0]=SystemStreamPartitionMetadata >>> [oldestOffset=0, newestOffset=41, upcomingOffset=42]}]) >>> >>> 2014-04-10 11:09:46 DefaultChooser [INFO] Building default chooser >>>with: >>> useBatching=false, useBootstrapping=true, usePriority=true >>> >>> 2014-04-10 11:09:46 BootstrappingChooser [INFO] Bootstrap stream is >>>fully >>> caught up: SystemStream [system=kafka, stream=wikipedia-raw] >>> >>> 2014-04-10 11:09:46 GetOffset [INFO] Validating offset 42 for topic and >>> partition [wikipedia-raw,0] >>> 2014-04-10 11:09:46 GetOffset [INFO] Able to successfully read from >>> offset 42 for topic and partition [wikipedia-raw,0]. Using it to >>> instantiate consumer. >>> >>> So it seems that Samza recognizes that I want to bootstrap >>>wikipedia-raw, >>> but as the stream processor is finishing initialization, it seems to >>> think that wikipedia-raw has been fully bootstrapped and so it starts >>> from offset 42. Any ideas? My config is below, full logs on pastebin >>> (http://pastebin.com/ZatAA0Ak) >>> >>> # Job >>> >>> >>> >>> >>> job.factory.class=org.apache.samza.job.local.LocalJobFactory >>> job.name=wikipedia-parser >>> >>> # Task >>> >>> >>> >>> >>> task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask >>> task.inputs=kafka.wikipedia-raw >>> >>>task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpoin >>>tM >>> anagerFactory >>> task.checkpoint.system=kafka >>> # Normally, this would be 3, but we have only one broker. >>> >>> >>> >>> >>> task.checkpoint.replication.factor=1 >>> >>> # Das boot >>> >>> >>> >>> >>> systems.kafka.streams.wikipedia-raw.samza.bootstrap=true >>> systems.kafka.streams.wikipedia-raw.samza.reset.offset=true >>> >>> # Metrics >>> >>> >>> >>> >>> metrics.reporters=snapshot,jmx >>> >>>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Metric >>>sS >>> napshotReporterFactory >>> metrics.reporter.snapshot.stream=kafka.metrics >>> >>>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporter >>>Fa >>> ctory >>> >>> # Serializers >>> >>> >>> >>> >>> >>>serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFa >>>ct >>> ory >>> >>>serializers.registry.metrics.class=org.apache.samza.serializers.MetricsS >>>na >>> pshotSerdeFactory >>> >>> # Systems >>> >>> >>> >>> >>> >>>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFac >>>to >>> ry >>> systems.kafka.samza.msg.serde=json >>> systems.kafka.consumer.zookeeper.connect=localhost:2181/ >>> #systems.kafka.consumer.auto.offset.reset=smallest >>> >>> >>> >>> >>> systems.kafka.producer.metadata.broker.list=localhost:9092 >>> systems.kafka.producer.producer.type=sync >>> # Normally, we'd set this much higher, but we want things to look >>>snappy >>> in the demo. >>> >>> >>> >>> systems.kafka.producer.batch.num.messages=1 >>> systems.kafka.streams.metrics.samza.msg.serde=metrics >>> >>> >>> >>> Thanks! >>> ‹T >> >
