Hey Garry, Yep, this is definitely a bug. I've opened:
https://issues.apache.org/jira/browse/SAMZA-145 I've also posted a patch, and will merge it in as soon as I get a +1 (by tomorrow). Cheers, Chris On 2/12/14 5:10 PM, "Garry Turkington" <g.turking...@improvedigital.com> wrote: >Hi Chris, > >OK, so that did have an affect. :) > >Adding the Kafka-level offset as 'smallest' I do indeed see messages from >the positive and negative word streams arrive in my stream task. But they >seem to be appearing interleaved with the other non-bootstrapped stream >so I think I'm seeing each stream being read from the smallest offset but >with no priority given to the bootstrapped streams. And I still see this >in the container logs, am I correct in assuming this should be showing >selection of the BootstrapChooser? > >DefaultChooser [INFO] Building default chooser with: useBatching=false, >useBootstrapping=false, usePriority=false > >Thanks for the help with this, hugely appreciated! > >Garry > > >-----Original Message----- >From: Chris Riccomini [mailto:criccom...@linkedin.com] >Sent: 12 February 2014 22:07 >To: dev@samza.incubator.apache.org >Subject: Re: Using bootstrap streams > >Hey Garry, > >I've opened: > > https://issues.apache.org/jira/browse/SAMZA-144 > >To track the stream-level Kafka configuration override issue. > >Let me know if the "smallest" setting works for you. > >Cheers, >Chris > >On 2/12/14 11:21 AM, "Chris Riccomini" <criccom...@linkedin.com> wrote: > >>Hey Garry, >> >>I just noticed that we don't actually support stream-level overrides >>for Kafka configs. >> >>To tes this, you'll have to set the consumer settings at the system >>level: >> >>systems.kafka.consumer.auto.offset.reset=smallest >> >> >>Note that this will cause you to read all data from kafka.tweets the >>first time you run your job, as well. I think this is probably what you >>want, but if not, you'd have to define two systems: one for the >>bootstrap streams, and one for the tweet stream, so that you could >>configure the bootstrap system to have the "smallest" reset setting. >> >>Cheers, >>Chris >> >>On 2/12/14 11:06 AM, "Chris Riccomini" <criccom...@linkedin.com> wrote: >> >>>Hey Garry, >>> >>>I believe this is a similar issue to SAMZA-142. >>> >>>Can you try adding a config to set auto.offset.reset to smallest? >>>Something like: >>> >>> >>>systems.kafka.streams.positive-words.consumer.auto.offset.reset=smalle >>>st >>> >>>systems.kafka.streams.negative-words.consumer.auto.offset.reset=smalle >>>st >>> >>> >>>This should change this log line: >>> >>>Final offset to be returned for Topic and Partition [positive-words,0] >>>= >>>2006 >>> >>> >>>To something like: >>> >>>Final offset to be returned for Topic and Partition [positive-words,0] >>>= >>>0 >>> >>> >>>Cheers, >>>Chris >>> >>>On 2/12/14 10:44 AM, "Garry Turkington" >>><g.turking...@improvedigital.com> >>>wrote: >>> >>>>Hi Chris, >>>> >>>>Sorry for the wrong log file! >>>> >>>>Samza container log is at: >>>>http://pastebin.com/D5bAJd7U >>>> >>>>I do notice that it mentions returning the highest offset for the >>>>supposedly bootstrapped streams which I presume shouldn't be happening. >>>> >>>>Thanks, >>>>Garry >>>> >>>>-----Original Message----- >>>>From: Chris Riccomini [mailto:criccom...@linkedin.com] >>>>Sent: 12 February 2014 17:42 >>>>To: dev@samza.incubator.apache.org >>>>Subject: Re: Using bootstrap streams >>>> >>>>Hey Garry, >>>> >>>>So far, everything looks normal. >>>> >>>>The container you log you sent me actually appears to be the output >>>>of the run-job.sh command. Since you're using YarnJobFactory, this is >>>>not actually the container log. Could you grab the log from the >>>>container that's running in YARN, and stick that in pastebin? You can >>>>usually find this by going to YARN's RM (http://localhost:8088) and >>>>finding the link to your ApplicationMaster. This will link to the >>>>logs for each container that's running your tasks. >>>> >>>>Cheers, >>>>Chris >>>> >>>>On 2/11/14 2:11 PM, "Garry Turkington" >>>><g.turking...@improvedigital.com> >>>>wrote: >>>> >>>>>Hi Chris, >>>>> >>>>>Following up on this, sorry for the delay, travelling this week. >>>>> >>>>>Main task config: >>>>>http://pastebin.com/enQXLcbZ >>>>> >>>>>Container log: >>>>>http://pastebin.com/YLiKp0CS >>>>> >>>>>I'm putting the positive and negative words into the bootstrap >>>>>streams prior to running the job -- and confirmed the data is in the >>>>>Kafka stream via kafka-console-consumer.sh with the --from-beginning >>>>>option. >>>>> >>>>>Thanks for any input! >>>>>Garry >>>>> >>>>>-----Original Message----- >>>>>From: Chris Riccomini [mailto:criccom...@linkedin.com] >>>>>Sent: 10 February 2014 23:25 >>>>>To: dev@samza.incubator.apache.org >>>>>Subject: Re: Using bootstrap streams >>>>> >>>>>Hey Garry, >>>>> >>>>>It sounds like your understanding of bootstrap streams is correct. >>>>> >>>>>Bootstrap stream messages will be delivered to the process() method >>>>>just like any other. The only difference is you're supposed to get >>>>>all of them from 0-lastOffset before you get any messages from >>>>>non-bootstrap streams. >>>>>Your positive/negative example sounds like a reasonable use case for >>>>>a bootstrap stream. >>>>> >>>>>A few questions: >>>>> >>>>>1. Can you post the container logs and the full configuration file >>>>>for your job somewhere (e.g. Github gist)? >>>>>2. Are you putting data into the positive-words and negative-words >>>>>topic before you start the Samza job? >>>>> >>>>>Also, you can do envelope.getSystemStreamPartition().getStream() >>>>>directly (no need to call getSystemStream()). >>>>> >>>>>Cheers, >>>>>Chris >>>>> >>>>>On 2/10/14 3:18 AM, "Garry Turkington" >>>>><g.turking...@improvedigital.com> >>>>>wrote: >>>>> >>>>>>Hi, >>>>>> >>>>>>I was building a task to do some sentiment analysis on incoming data. >>>>>>I have a corpus each of positive and negative words to which the >>>>>>task needs access. This seemed like a good fit for bootstrap >>>>>>streams. But I can't seem to get them to work. >>>>>> >>>>>>I have my job configured with the 3 Kafka topics in task.inputs and >>>>>>that seems to work, just throwing data at any of the topics is >>>>>>hitting the task. >>>>>> >>>>>>But setting up the 2 reference streams as bootstrap doesn't seem to >>>>>>be working. Here's the relevant part of the config, I want to read >>>>>>the entire message history each time: >>>>>> >>>>>>systems.kafka.streams.positive-words.samza.bootstrap=true >>>>>>systems.kafka.streams.positive-words.samza.reset.offset=true >>>>>> >>>>>>systems.kafka.streams.negative-words.samza.bootstrap=true >>>>>>systems.kafka.streams.negative-words.samza.reset.offset=true >>>>>> >>>>>>Do bootstrap streams get handled in any special way, I'm assuming >>>>>>here that the messages will arrive in the process method on >>>>>>StreamTask just like any other and I can handle them differently by >>>>>>switching on >>>>>>envelope.getSystemStreamPartition().getSystemStream().getStream(). >>>>>>Looking at the code it looks the same with the BootstrapChooser >>>>>>doing its magic to determine which message is delivered to the task >>>>>>but the actual delivery seems the same. >>>>>> >>>>>>What am I missing? >>>>>> >>>>>>Thanks, >>>>>>Garry >>>>>> >>>>> >>>>> >>>>>----- >>>>>No virus found in this message. >>>>>Checked by AVG - www.avg.com >>>>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: >>>>>02/10/14 >>>> >>>> >>>>----- >>>>No virus found in this message. >>>>Checked by AVG - www.avg.com >>>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: >>>>02/10/14 >>> >> > > >----- >No virus found in this message. >Checked by AVG - www.avg.com >Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: 02/10/14