Hi Chris,

OK, so that did have an affect. :)

Adding the Kafka-level offset as 'smallest' I do indeed see messages from the 
positive and negative word streams arrive in my stream task. But they seem to 
be appearing interleaved with the other non-bootstrapped stream so I think I'm 
seeing each stream being read from the smallest offset but with no priority 
given to the bootstrapped streams. And I still see this in the container logs, 
am I correct in assuming this should be showing selection of the 
BootstrapChooser?

DefaultChooser [INFO] Building default chooser with: useBatching=false, 
useBootstrapping=false, usePriority=false

Thanks for the help with this, hugely appreciated!

Garry


-----Original Message-----
From: Chris Riccomini [mailto:criccom...@linkedin.com] 
Sent: 12 February 2014 22:07
To: dev@samza.incubator.apache.org
Subject: Re: Using bootstrap streams

Hey Garry,

I've opened:

  https://issues.apache.org/jira/browse/SAMZA-144

To track the stream-level Kafka configuration override issue.

Let me know if the "smallest" setting works for you.

Cheers,
Chris

On 2/12/14 11:21 AM, "Chris Riccomini" <criccom...@linkedin.com> wrote:

>Hey Garry,
>
>I just noticed that we don't actually support stream-level overrides 
>for Kafka configs.
>
>To tes this, you'll have to set the consumer settings at the system level:
>
>systems.kafka.consumer.auto.offset.reset=smallest
>
>
>Note that this will cause you to read all data from kafka.tweets the 
>first time you run your job, as well. I think this is probably what you 
>want, but if not, you'd have to define two systems: one for the 
>bootstrap streams, and one for the tweet stream, so that you could 
>configure the bootstrap system to have the "smallest" reset setting.
>
>Cheers,
>Chris
>
>On 2/12/14 11:06 AM, "Chris Riccomini" <criccom...@linkedin.com> wrote:
>
>>Hey Garry,
>>
>>I believe this is a similar issue to SAMZA-142.
>>
>>Can you try adding a config to set auto.offset.reset to smallest?
>>Something like:
>>
>>  
>>systems.kafka.streams.positive-words.consumer.auto.offset.reset=smalle
>>st
>>  
>>systems.kafka.streams.negative-words.consumer.auto.offset.reset=smalle
>>st
>>
>>
>>This should change this log line:
>>
>>Final offset to be returned for Topic and Partition [positive-words,0] 
>>=
>>2006
>>
>>
>>To something like:
>>
>>Final offset to be returned for Topic and Partition [positive-words,0] 
>>=
>>0
>>
>>
>>Cheers,
>>Chris
>>
>>On 2/12/14 10:44 AM, "Garry Turkington" 
>><g.turking...@improvedigital.com>
>>wrote:
>>
>>>Hi Chris,
>>>
>>>Sorry for the wrong log file!
>>>
>>>Samza  container log is at:
>>>http://pastebin.com/D5bAJd7U
>>>
>>>I do notice that it mentions returning the highest offset for the 
>>>supposedly bootstrapped streams which I presume shouldn't be happening.
>>>
>>>Thanks,
>>>Garry
>>>
>>>-----Original Message-----
>>>From: Chris Riccomini [mailto:criccom...@linkedin.com]
>>>Sent: 12 February 2014 17:42
>>>To: dev@samza.incubator.apache.org
>>>Subject: Re: Using bootstrap streams
>>>
>>>Hey Garry,
>>>
>>>So far, everything looks normal.
>>>
>>>The container you log you sent me actually appears to be the output 
>>>of the run-job.sh command. Since you're using YarnJobFactory, this is 
>>>not actually the container log. Could you grab the log from the 
>>>container that's running in YARN, and stick that in pastebin? You can 
>>>usually find this by going to YARN's RM (http://localhost:8088) and 
>>>finding the link to your ApplicationMaster. This will link to the 
>>>logs for each container that's running your tasks.
>>>
>>>Cheers,
>>>Chris
>>>
>>>On 2/11/14 2:11 PM, "Garry Turkington" 
>>><g.turking...@improvedigital.com>
>>>wrote:
>>>
>>>>Hi Chris,
>>>>
>>>>Following up on this, sorry for the delay, travelling this week.
>>>>
>>>>Main task config:
>>>>http://pastebin.com/enQXLcbZ
>>>>
>>>>Container log:
>>>>http://pastebin.com/YLiKp0CS
>>>>
>>>>I'm putting the positive and negative words into the bootstrap 
>>>>streams prior to running the job -- and confirmed the data is in the 
>>>>Kafka stream via kafka-console-consumer.sh with the --from-beginning option.
>>>>
>>>>Thanks for any input!
>>>>Garry
>>>>
>>>>-----Original Message-----
>>>>From: Chris Riccomini [mailto:criccom...@linkedin.com]
>>>>Sent: 10 February 2014 23:25
>>>>To: dev@samza.incubator.apache.org
>>>>Subject: Re: Using bootstrap streams
>>>>
>>>>Hey Garry,
>>>>
>>>>It sounds like your understanding of bootstrap streams is correct.
>>>>
>>>>Bootstrap stream messages will be delivered to the process() method 
>>>>just like any other. The only difference is you're supposed to get 
>>>>all of them from 0-lastOffset before you get any messages from 
>>>>non-bootstrap streams.
>>>>Your positive/negative example sounds like a reasonable use case for 
>>>>a bootstrap stream.
>>>>
>>>>A few questions:
>>>>
>>>>1. Can you post the container logs and the full configuration file 
>>>>for your job somewhere (e.g. Github gist)?
>>>>2. Are you putting data into the positive-words and negative-words 
>>>>topic before you start the Samza job?
>>>>
>>>>Also, you can do envelope.getSystemStreamPartition().getStream()
>>>>directly (no need to call getSystemStream()).
>>>>
>>>>Cheers,
>>>>Chris
>>>>
>>>>On 2/10/14 3:18 AM, "Garry Turkington"
>>>><g.turking...@improvedigital.com>
>>>>wrote:
>>>>
>>>>>Hi,
>>>>>
>>>>>I was building a task to do some sentiment analysis on incoming data.
>>>>>I have a corpus each of positive and negative words to which the 
>>>>>task needs access. This seemed like a good fit for bootstrap 
>>>>>streams. But I can't seem to get them to work.
>>>>>
>>>>>I have my job configured with the 3 Kafka topics in task.inputs and 
>>>>>that seems to work, just throwing data at any of the topics is 
>>>>>hitting the task.
>>>>>
>>>>>But setting up the 2 reference streams as bootstrap doesn't seem to 
>>>>>be working. Here's the relevant part of the config, I want to read 
>>>>>the entire message history each time:
>>>>>
>>>>>systems.kafka.streams.positive-words.samza.bootstrap=true
>>>>>systems.kafka.streams.positive-words.samza.reset.offset=true
>>>>>
>>>>>systems.kafka.streams.negative-words.samza.bootstrap=true
>>>>>systems.kafka.streams.negative-words.samza.reset.offset=true
>>>>>
>>>>>Do bootstrap streams get handled in any special way, I'm assuming 
>>>>>here that the messages will arrive in the process method on 
>>>>>StreamTask just like any other and I can handle them differently by 
>>>>>switching on 
>>>>>envelope.getSystemStreamPartition().getSystemStream().getStream().
>>>>>Looking at the code it looks the same with the BootstrapChooser 
>>>>>doing its magic to determine which message is delivered to the task 
>>>>>but the actual delivery seems the same.
>>>>>
>>>>>What am I missing?
>>>>>
>>>>>Thanks,
>>>>>Garry
>>>>>
>>>>
>>>>
>>>>-----
>>>>No virus found in this message.
>>>>Checked by AVG - www.avg.com
>>>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date:
>>>>02/10/14
>>>
>>>
>>>-----
>>>No virus found in this message.
>>>Checked by AVG - www.avg.com
>>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date:
>>>02/10/14
>>
>


-----
No virus found in this message.
Checked by AVG - www.avg.com
Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: 02/10/14

Reply via email to