Hey TJ, Samza has a setting called systems.%s.streams.%s.samza.offset.default. If undefined, it uses "upcoming", which means start from the most recent message in the topic. For a bootstrap stream, you probably want to set this to "oldest".
systems.kafka.wikipedia-raw.samza.offset.default=oldest This will tell Samza to start from offset 0, not offset 42, in the case where there is no checkpoint available (i.e. When you start your job for the first time). Once there is a checkpoint (e.g. You've run your job for a period of time, and a checkpoint manager is configured), Samza will use the checkpoint. For example, if you've read to offset 55, and then restart, the SamzaContainer will begin bootstrapping from offset 55 (not offset 0). To force the bootstrapping to ALWAYS begin from offset 0, use this setting: systems.kafka.wikipedia-raw.samza.reset.offset=true This will tell Samza to disregard the check pointed offset for the stream when it starts up. By forcing the offset to oldest, and disregarding offsets, you'll always re-process the entire bootstrap stream from offset 0. This is likely what you want. This is undocumented right now. Sorry about that. Cheers, Chris On 4/10/14 11:19 AM, "TJ Giuli" <[email protected]> wrote: >Hi, I¹m having some trouble getting bootstrapping to work. To start with >a simple example, I¹ve augmented hello-samza to get the Wikipedia feed >parser to bootstrap from the wikipedia-raw topic. When I peek into the >logs, I see the following lines: > >2014-04-10 11:09:46 KafkaSystemAdmin$ [INFO] Got metadata: >Map(wikipedia-raw -> SystemStreamMetadata [streamName=wikipedia-raw, >partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata >[oldestOffset=0, newestOffset=41, upcomingOffset=42]}]) > >2014-04-10 11:09:46 DefaultChooser [INFO] Building default chooser with: >useBatching=false, useBootstrapping=true, usePriority=true > >2014-04-10 11:09:46 BootstrappingChooser [INFO] Bootstrap stream is fully >caught up: SystemStream [system=kafka, stream=wikipedia-raw] > >2014-04-10 11:09:46 GetOffset [INFO] Validating offset 42 for topic and >partition [wikipedia-raw,0] >2014-04-10 11:09:46 GetOffset [INFO] Able to successfully read from >offset 42 for topic and partition [wikipedia-raw,0]. Using it to >instantiate consumer. > >So it seems that Samza recognizes that I want to bootstrap wikipedia-raw, >but as the stream processor is finishing initialization, it seems to >think that wikipedia-raw has been fully bootstrapped and so it starts >from offset 42. Any ideas? My config is below, full logs on pastebin >(http://pastebin.com/ZatAA0Ak) > ># Job > > > > >job.factory.class=org.apache.samza.job.local.LocalJobFactory >job.name=wikipedia-parser > ># Task > > > > >task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask >task.inputs=kafka.wikipedia-raw >task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointM >anagerFactory >task.checkpoint.system=kafka ># Normally, this would be 3, but we have only one broker. > > > > >task.checkpoint.replication.factor=1 > ># Das boot > > > > >systems.kafka.streams.wikipedia-raw.samza.bootstrap=true >systems.kafka.streams.wikipedia-raw.samza.reset.offset=true > ># Metrics > > > > >metrics.reporters=snapshot,jmx >metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsS >napshotReporterFactory >metrics.reporter.snapshot.stream=kafka.metrics >metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFa >ctory > ># Serializers > > > > >serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFact >ory >serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSna >pshotSerdeFactory > ># Systems > > > > >systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFacto >ry >systems.kafka.samza.msg.serde=json >systems.kafka.consumer.zookeeper.connect=localhost:2181/ >#systems.kafka.consumer.auto.offset.reset=smallest > > > > >systems.kafka.producer.metadata.broker.list=localhost:9092 >systems.kafka.producer.producer.type=sync ># Normally, we'd set this much higher, but we want things to look snappy >in the demo. > > > >systems.kafka.producer.batch.num.messages=1 >systems.kafka.streams.metrics.samza.msg.serde=metrics > > > >Thanks! >‹T
