Hey, Chris, thanks for the help. I’ve done a bit more playing around and I’ve found that with my experimental setup I have to set *both* offset.default=oldest and reset.offset=true to consistently bootstrap the stream from the beginning. Here are the config lines of importance:
systems.kafka.streams.wikipedia-raw.samza.bootstrap=true systems.kafka.streams.wikipedia-raw.samza.reset.offset=true systems.kafka.streams.wikipedia-raw.samza.offset.default=oldest And here are the log outputs Only offset.default=oldest: http://pastebin.com/yxnw2Ypu This behaves as expected — my task had checkpointed and I was not filling the wikipedia-raw topic with anything else, so the task did not read in any messages. Only reset.offset=true: http://pastebin.com/sw50RiYz 2014-04-11 09:55:14 OffsetManager$ [DEBUG] Using default offset UPCOMING for SystemStream [system=kafka, stream=wikipedia-raw]. 2014-04-11 09:55:14 OffsetManager$ [DEBUG] Using reset offset true for SystemStream [system=kafka, stream=wikipedia-raw]. In this case, just setting reset.offset=true is not enough — the stream still starts from UPCOMING and thus does not read any messages from before the checkpoint. Both: http://pastebin.com/WF2MQZcN 2014-04-11 09:57:45 OffsetManager$ [DEBUG] Using default offset OLDEST for SystemStream [system=kafka, stream=wikipedia-raw]. 2014-04-11 09:57:45 OffsetManager$ [DEBUG] Using reset offset true for SystemStream [system=kafka, stream=wikipedia-raw]. This case both resets the offset back to 0 and consumes messages from the 0th message in wikipedia-raw. Is this the expected behavior? Again, thanks for the help! —T On Apr 10, 2014, at 2:01 PM, Chris Riccomini <[email protected]> wrote: > Hey TJ, > > Samza has a setting called systems.%s.streams.%s.samza.offset.default. If > undefined, it uses "upcoming", which means start from the most recent > message in the topic. For a bootstrap stream, you probably want to set > this to "oldest". > > systems.kafka.wikipedia-raw.samza.offset.default=oldest > > This will tell Samza to start from offset 0, not offset 42, in the case > where there is no checkpoint available (i.e. When you start your job for > the first time). Once there is a checkpoint (e.g. You've run your job for > a period of time, and a checkpoint manager is configured), Samza will use > the checkpoint. For example, if you've read to offset 55, and then > restart, the SamzaContainer will begin bootstrapping from offset 55 (not > offset 0). To force the bootstrapping to ALWAYS begin from offset 0, use > this setting: > > systems.kafka.wikipedia-raw.samza.reset.offset=true > > > This will tell Samza to disregard the check pointed offset for the stream > when it starts up. By forcing the offset to oldest, and disregarding > offsets, you'll always re-process the entire bootstrap stream from offset > 0. This is likely what you want. > > This is undocumented right now. Sorry about that. > > Cheers, > Chris > > On 4/10/14 11:19 AM, "TJ Giuli" <[email protected]> wrote: > >> Hi, I¹m having some trouble getting bootstrapping to work. To start with >> a simple example, I¹ve augmented hello-samza to get the Wikipedia feed >> parser to bootstrap from the wikipedia-raw topic. When I peek into the >> logs, I see the following lines: >> >> 2014-04-10 11:09:46 KafkaSystemAdmin$ [INFO] Got metadata: >> Map(wikipedia-raw -> SystemStreamMetadata [streamName=wikipedia-raw, >> partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata >> [oldestOffset=0, newestOffset=41, upcomingOffset=42]}]) >> >> 2014-04-10 11:09:46 DefaultChooser [INFO] Building default chooser with: >> useBatching=false, useBootstrapping=true, usePriority=true >> >> 2014-04-10 11:09:46 BootstrappingChooser [INFO] Bootstrap stream is fully >> caught up: SystemStream [system=kafka, stream=wikipedia-raw] >> >> 2014-04-10 11:09:46 GetOffset [INFO] Validating offset 42 for topic and >> partition [wikipedia-raw,0] >> 2014-04-10 11:09:46 GetOffset [INFO] Able to successfully read from >> offset 42 for topic and partition [wikipedia-raw,0]. Using it to >> instantiate consumer. >> >> So it seems that Samza recognizes that I want to bootstrap wikipedia-raw, >> but as the stream processor is finishing initialization, it seems to >> think that wikipedia-raw has been fully bootstrapped and so it starts >> from offset 42. Any ideas? My config is below, full logs on pastebin >> (http://pastebin.com/ZatAA0Ak) >> >> # Job >> >> >> >> >> job.factory.class=org.apache.samza.job.local.LocalJobFactory >> job.name=wikipedia-parser >> >> # Task >> >> >> >> >> task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask >> task.inputs=kafka.wikipedia-raw >> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointM >> anagerFactory >> task.checkpoint.system=kafka >> # Normally, this would be 3, but we have only one broker. >> >> >> >> >> task.checkpoint.replication.factor=1 >> >> # Das boot >> >> >> >> >> systems.kafka.streams.wikipedia-raw.samza.bootstrap=true >> systems.kafka.streams.wikipedia-raw.samza.reset.offset=true >> >> # Metrics >> >> >> >> >> metrics.reporters=snapshot,jmx >> metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsS >> napshotReporterFactory >> metrics.reporter.snapshot.stream=kafka.metrics >> metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFa >> ctory >> >> # Serializers >> >> >> >> >> serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFact >> ory >> serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSna >> pshotSerdeFactory >> >> # Systems >> >> >> >> >> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFacto >> ry >> systems.kafka.samza.msg.serde=json >> systems.kafka.consumer.zookeeper.connect=localhost:2181/ >> #systems.kafka.consumer.auto.offset.reset=smallest >> >> >> >> >> systems.kafka.producer.metadata.broker.list=localhost:9092 >> systems.kafka.producer.producer.type=sync >> # Normally, we'd set this much higher, but we want things to look snappy >> in the demo. >> >> >> >> systems.kafka.producer.batch.num.messages=1 >> systems.kafka.streams.metrics.samza.msg.serde=metrics >> >> >> >> Thanks! >> ‹T >
