Hey Garry, I just noticed that we don't actually support stream-level overrides for Kafka configs.
To tes this, you'll have to set the consumer settings at the system level: systems.kafka.consumer.auto.offset.reset=smallest Note that this will cause you to read all data from kafka.tweets the first time you run your job, as well. I think this is probably what you want, but if not, you'd have to define two systems: one for the bootstrap streams, and one for the tweet stream, so that you could configure the bootstrap system to have the "smallest" reset setting. Cheers, Chris On 2/12/14 11:06 AM, "Chris Riccomini" <criccom...@linkedin.com> wrote: >Hey Garry, > >I believe this is a similar issue to SAMZA-142. > >Can you try adding a config to set auto.offset.reset to smallest? >Something like: > > systems.kafka.streams.positive-words.consumer.auto.offset.reset=smallest > systems.kafka.streams.negative-words.consumer.auto.offset.reset=smallest > > >This should change this log line: > >Final offset to be returned for Topic and Partition [positive-words,0] = >2006 > > >To something like: > >Final offset to be returned for Topic and Partition [positive-words,0] = 0 > > >Cheers, >Chris > >On 2/12/14 10:44 AM, "Garry Turkington" <g.turking...@improvedigital.com> >wrote: > >>Hi Chris, >> >>Sorry for the wrong log file! >> >>Samza container log is at: >>http://pastebin.com/D5bAJd7U >> >>I do notice that it mentions returning the highest offset for the >>supposedly bootstrapped streams which I presume shouldn't be happening. >> >>Thanks, >>Garry >> >>-----Original Message----- >>From: Chris Riccomini [mailto:criccom...@linkedin.com] >>Sent: 12 February 2014 17:42 >>To: dev@samza.incubator.apache.org >>Subject: Re: Using bootstrap streams >> >>Hey Garry, >> >>So far, everything looks normal. >> >>The container you log you sent me actually appears to be the output of >>the run-job.sh command. Since you're using YarnJobFactory, this is not >>actually the container log. Could you grab the log from the container >>that's running in YARN, and stick that in pastebin? You can usually find >>this by going to YARN's RM (http://localhost:8088) and finding the link >>to your ApplicationMaster. This will link to the logs for each container >>that's running your tasks. >> >>Cheers, >>Chris >> >>On 2/11/14 2:11 PM, "Garry Turkington" <g.turking...@improvedigital.com> >>wrote: >> >>>Hi Chris, >>> >>>Following up on this, sorry for the delay, travelling this week. >>> >>>Main task config: >>>http://pastebin.com/enQXLcbZ >>> >>>Container log: >>>http://pastebin.com/YLiKp0CS >>> >>>I'm putting the positive and negative words into the bootstrap streams >>>prior to running the job -- and confirmed the data is in the Kafka >>>stream via kafka-console-consumer.sh with the --from-beginning option. >>> >>>Thanks for any input! >>>Garry >>> >>>-----Original Message----- >>>From: Chris Riccomini [mailto:criccom...@linkedin.com] >>>Sent: 10 February 2014 23:25 >>>To: dev@samza.incubator.apache.org >>>Subject: Re: Using bootstrap streams >>> >>>Hey Garry, >>> >>>It sounds like your understanding of bootstrap streams is correct. >>> >>>Bootstrap stream messages will be delivered to the process() method >>>just like any other. The only difference is you're supposed to get all >>>of them from 0-lastOffset before you get any messages from non-bootstrap >>>streams. >>>Your positive/negative example sounds like a reasonable use case for a >>>bootstrap stream. >>> >>>A few questions: >>> >>>1. Can you post the container logs and the full configuration file for >>>your job somewhere (e.g. Github gist)? >>>2. Are you putting data into the positive-words and negative-words >>>topic before you start the Samza job? >>> >>>Also, you can do envelope.getSystemStreamPartition().getStream() >>>directly (no need to call getSystemStream()). >>> >>>Cheers, >>>Chris >>> >>>On 2/10/14 3:18 AM, "Garry Turkington" >>><g.turking...@improvedigital.com> >>>wrote: >>> >>>>Hi, >>>> >>>>I was building a task to do some sentiment analysis on incoming data. >>>>I have a corpus each of positive and negative words to which the task >>>>needs access. This seemed like a good fit for bootstrap streams. But I >>>>can't seem to get them to work. >>>> >>>>I have my job configured with the 3 Kafka topics in task.inputs and >>>>that seems to work, just throwing data at any of the topics is hitting >>>>the task. >>>> >>>>But setting up the 2 reference streams as bootstrap doesn't seem to be >>>>working. Here's the relevant part of the config, I want to read the >>>>entire message history each time: >>>> >>>>systems.kafka.streams.positive-words.samza.bootstrap=true >>>>systems.kafka.streams.positive-words.samza.reset.offset=true >>>> >>>>systems.kafka.streams.negative-words.samza.bootstrap=true >>>>systems.kafka.streams.negative-words.samza.reset.offset=true >>>> >>>>Do bootstrap streams get handled in any special way, I'm assuming here >>>>that the messages will arrive in the process method on StreamTask just >>>>like any other and I can handle them differently by switching on >>>>envelope.getSystemStreamPartition().getSystemStream().getStream(). >>>>Looking at the code it looks the same with the BootstrapChooser doing >>>>its magic to determine which message is delivered to the task but the >>>>actual delivery seems the same. >>>> >>>>What am I missing? >>>> >>>>Thanks, >>>>Garry >>>> >>> >>> >>>----- >>>No virus found in this message. >>>Checked by AVG - www.avg.com >>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: >>>02/10/14 >> >> >>----- >>No virus found in this message. >>Checked by AVG - www.avg.com >>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: 02/10/14 >