Hi Chris, OK, so that did have an affect. :)
Adding the Kafka-level offset as 'smallest' I do indeed see messages from the positive and negative word streams arrive in my stream task. But they seem to be appearing interleaved with the other non-bootstrapped stream so I think I'm seeing each stream being read from the smallest offset but with no priority given to the bootstrapped streams. And I still see this in the container logs, am I correct in assuming this should be showing selection of the BootstrapChooser? DefaultChooser [INFO] Building default chooser with: useBatching=false, useBootstrapping=false, usePriority=false Thanks for the help with this, hugely appreciated! Garry -----Original Message----- From: Chris Riccomini [mailto:criccom...@linkedin.com] Sent: 12 February 2014 22:07 To: dev@samza.incubator.apache.org Subject: Re: Using bootstrap streams Hey Garry, I've opened: https://issues.apache.org/jira/browse/SAMZA-144 To track the stream-level Kafka configuration override issue. Let me know if the "smallest" setting works for you. Cheers, Chris On 2/12/14 11:21 AM, "Chris Riccomini" <criccom...@linkedin.com> wrote: >Hey Garry, > >I just noticed that we don't actually support stream-level overrides >for Kafka configs. > >To tes this, you'll have to set the consumer settings at the system level: > >systems.kafka.consumer.auto.offset.reset=smallest > > >Note that this will cause you to read all data from kafka.tweets the >first time you run your job, as well. I think this is probably what you >want, but if not, you'd have to define two systems: one for the >bootstrap streams, and one for the tweet stream, so that you could >configure the bootstrap system to have the "smallest" reset setting. > >Cheers, >Chris > >On 2/12/14 11:06 AM, "Chris Riccomini" <criccom...@linkedin.com> wrote: > >>Hey Garry, >> >>I believe this is a similar issue to SAMZA-142. >> >>Can you try adding a config to set auto.offset.reset to smallest? >>Something like: >> >> >>systems.kafka.streams.positive-words.consumer.auto.offset.reset=smalle >>st >> >>systems.kafka.streams.negative-words.consumer.auto.offset.reset=smalle >>st >> >> >>This should change this log line: >> >>Final offset to be returned for Topic and Partition [positive-words,0] >>= >>2006 >> >> >>To something like: >> >>Final offset to be returned for Topic and Partition [positive-words,0] >>= >>0 >> >> >>Cheers, >>Chris >> >>On 2/12/14 10:44 AM, "Garry Turkington" >><g.turking...@improvedigital.com> >>wrote: >> >>>Hi Chris, >>> >>>Sorry for the wrong log file! >>> >>>Samza container log is at: >>>http://pastebin.com/D5bAJd7U >>> >>>I do notice that it mentions returning the highest offset for the >>>supposedly bootstrapped streams which I presume shouldn't be happening. >>> >>>Thanks, >>>Garry >>> >>>-----Original Message----- >>>From: Chris Riccomini [mailto:criccom...@linkedin.com] >>>Sent: 12 February 2014 17:42 >>>To: dev@samza.incubator.apache.org >>>Subject: Re: Using bootstrap streams >>> >>>Hey Garry, >>> >>>So far, everything looks normal. >>> >>>The container you log you sent me actually appears to be the output >>>of the run-job.sh command. Since you're using YarnJobFactory, this is >>>not actually the container log. Could you grab the log from the >>>container that's running in YARN, and stick that in pastebin? You can >>>usually find this by going to YARN's RM (http://localhost:8088) and >>>finding the link to your ApplicationMaster. This will link to the >>>logs for each container that's running your tasks. >>> >>>Cheers, >>>Chris >>> >>>On 2/11/14 2:11 PM, "Garry Turkington" >>><g.turking...@improvedigital.com> >>>wrote: >>> >>>>Hi Chris, >>>> >>>>Following up on this, sorry for the delay, travelling this week. >>>> >>>>Main task config: >>>>http://pastebin.com/enQXLcbZ >>>> >>>>Container log: >>>>http://pastebin.com/YLiKp0CS >>>> >>>>I'm putting the positive and negative words into the bootstrap >>>>streams prior to running the job -- and confirmed the data is in the >>>>Kafka stream via kafka-console-consumer.sh with the --from-beginning option. >>>> >>>>Thanks for any input! >>>>Garry >>>> >>>>-----Original Message----- >>>>From: Chris Riccomini [mailto:criccom...@linkedin.com] >>>>Sent: 10 February 2014 23:25 >>>>To: dev@samza.incubator.apache.org >>>>Subject: Re: Using bootstrap streams >>>> >>>>Hey Garry, >>>> >>>>It sounds like your understanding of bootstrap streams is correct. >>>> >>>>Bootstrap stream messages will be delivered to the process() method >>>>just like any other. The only difference is you're supposed to get >>>>all of them from 0-lastOffset before you get any messages from >>>>non-bootstrap streams. >>>>Your positive/negative example sounds like a reasonable use case for >>>>a bootstrap stream. >>>> >>>>A few questions: >>>> >>>>1. Can you post the container logs and the full configuration file >>>>for your job somewhere (e.g. Github gist)? >>>>2. Are you putting data into the positive-words and negative-words >>>>topic before you start the Samza job? >>>> >>>>Also, you can do envelope.getSystemStreamPartition().getStream() >>>>directly (no need to call getSystemStream()). >>>> >>>>Cheers, >>>>Chris >>>> >>>>On 2/10/14 3:18 AM, "Garry Turkington" >>>><g.turking...@improvedigital.com> >>>>wrote: >>>> >>>>>Hi, >>>>> >>>>>I was building a task to do some sentiment analysis on incoming data. >>>>>I have a corpus each of positive and negative words to which the >>>>>task needs access. This seemed like a good fit for bootstrap >>>>>streams. But I can't seem to get them to work. >>>>> >>>>>I have my job configured with the 3 Kafka topics in task.inputs and >>>>>that seems to work, just throwing data at any of the topics is >>>>>hitting the task. >>>>> >>>>>But setting up the 2 reference streams as bootstrap doesn't seem to >>>>>be working. Here's the relevant part of the config, I want to read >>>>>the entire message history each time: >>>>> >>>>>systems.kafka.streams.positive-words.samza.bootstrap=true >>>>>systems.kafka.streams.positive-words.samza.reset.offset=true >>>>> >>>>>systems.kafka.streams.negative-words.samza.bootstrap=true >>>>>systems.kafka.streams.negative-words.samza.reset.offset=true >>>>> >>>>>Do bootstrap streams get handled in any special way, I'm assuming >>>>>here that the messages will arrive in the process method on >>>>>StreamTask just like any other and I can handle them differently by >>>>>switching on >>>>>envelope.getSystemStreamPartition().getSystemStream().getStream(). >>>>>Looking at the code it looks the same with the BootstrapChooser >>>>>doing its magic to determine which message is delivered to the task >>>>>but the actual delivery seems the same. >>>>> >>>>>What am I missing? >>>>> >>>>>Thanks, >>>>>Garry >>>>> >>>> >>>> >>>>----- >>>>No virus found in this message. >>>>Checked by AVG - www.avg.com >>>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: >>>>02/10/14 >>> >>> >>>----- >>>No virus found in this message. >>>Checked by AVG - www.avg.com >>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: >>>02/10/14 >> > ----- No virus found in this message. Checked by AVG - www.avg.com Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: 02/10/14