Hey Garry,

Yep, this is definitely a bug. I've opened:

  https://issues.apache.org/jira/browse/SAMZA-145


I've also posted a patch, and will merge it in as soon as I get a +1 (by
tomorrow).

Cheers,
Chris

On 2/12/14 5:10 PM, "Garry Turkington" <g.turking...@improvedigital.com>
wrote:

>Hi Chris,
>
>OK, so that did have an affect. :)
>
>Adding the Kafka-level offset as 'smallest' I do indeed see messages from
>the positive and negative word streams arrive in my stream task. But they
>seem to be appearing interleaved with the other non-bootstrapped stream
>so I think I'm seeing each stream being read from the smallest offset but
>with no priority given to the bootstrapped streams. And I still see this
>in the container logs, am I correct in assuming this should be showing
>selection of the BootstrapChooser?
>
>DefaultChooser [INFO] Building default chooser with: useBatching=false,
>useBootstrapping=false, usePriority=false
>
>Thanks for the help with this, hugely appreciated!
>
>Garry
>
>
>-----Original Message-----
>From: Chris Riccomini [mailto:criccom...@linkedin.com]
>Sent: 12 February 2014 22:07
>To: dev@samza.incubator.apache.org
>Subject: Re: Using bootstrap streams
>
>Hey Garry,
>
>I've opened:
>
>  https://issues.apache.org/jira/browse/SAMZA-144
>
>To track the stream-level Kafka configuration override issue.
>
>Let me know if the "smallest" setting works for you.
>
>Cheers,
>Chris
>
>On 2/12/14 11:21 AM, "Chris Riccomini" <criccom...@linkedin.com> wrote:
>
>>Hey Garry,
>>
>>I just noticed that we don't actually support stream-level overrides
>>for Kafka configs.
>>
>>To tes this, you'll have to set the consumer settings at the system
>>level:
>>
>>systems.kafka.consumer.auto.offset.reset=smallest
>>
>>
>>Note that this will cause you to read all data from kafka.tweets the
>>first time you run your job, as well. I think this is probably what you
>>want, but if not, you'd have to define two systems: one for the
>>bootstrap streams, and one for the tweet stream, so that you could
>>configure the bootstrap system to have the "smallest" reset setting.
>>
>>Cheers,
>>Chris
>>
>>On 2/12/14 11:06 AM, "Chris Riccomini" <criccom...@linkedin.com> wrote:
>>
>>>Hey Garry,
>>>
>>>I believe this is a similar issue to SAMZA-142.
>>>
>>>Can you try adding a config to set auto.offset.reset to smallest?
>>>Something like:
>>>
>>>  
>>>systems.kafka.streams.positive-words.consumer.auto.offset.reset=smalle
>>>st
>>>  
>>>systems.kafka.streams.negative-words.consumer.auto.offset.reset=smalle
>>>st
>>>
>>>
>>>This should change this log line:
>>>
>>>Final offset to be returned for Topic and Partition [positive-words,0]
>>>=
>>>2006
>>>
>>>
>>>To something like:
>>>
>>>Final offset to be returned for Topic and Partition [positive-words,0]
>>>=
>>>0
>>>
>>>
>>>Cheers,
>>>Chris
>>>
>>>On 2/12/14 10:44 AM, "Garry Turkington"
>>><g.turking...@improvedigital.com>
>>>wrote:
>>>
>>>>Hi Chris,
>>>>
>>>>Sorry for the wrong log file!
>>>>
>>>>Samza  container log is at:
>>>>http://pastebin.com/D5bAJd7U
>>>>
>>>>I do notice that it mentions returning the highest offset for the
>>>>supposedly bootstrapped streams which I presume shouldn't be happening.
>>>>
>>>>Thanks,
>>>>Garry
>>>>
>>>>-----Original Message-----
>>>>From: Chris Riccomini [mailto:criccom...@linkedin.com]
>>>>Sent: 12 February 2014 17:42
>>>>To: dev@samza.incubator.apache.org
>>>>Subject: Re: Using bootstrap streams
>>>>
>>>>Hey Garry,
>>>>
>>>>So far, everything looks normal.
>>>>
>>>>The container you log you sent me actually appears to be the output
>>>>of the run-job.sh command. Since you're using YarnJobFactory, this is
>>>>not actually the container log. Could you grab the log from the
>>>>container that's running in YARN, and stick that in pastebin? You can
>>>>usually find this by going to YARN's RM (http://localhost:8088) and
>>>>finding the link to your ApplicationMaster. This will link to the
>>>>logs for each container that's running your tasks.
>>>>
>>>>Cheers,
>>>>Chris
>>>>
>>>>On 2/11/14 2:11 PM, "Garry Turkington"
>>>><g.turking...@improvedigital.com>
>>>>wrote:
>>>>
>>>>>Hi Chris,
>>>>>
>>>>>Following up on this, sorry for the delay, travelling this week.
>>>>>
>>>>>Main task config:
>>>>>http://pastebin.com/enQXLcbZ
>>>>>
>>>>>Container log:
>>>>>http://pastebin.com/YLiKp0CS
>>>>>
>>>>>I'm putting the positive and negative words into the bootstrap
>>>>>streams prior to running the job -- and confirmed the data is in the
>>>>>Kafka stream via kafka-console-consumer.sh with the --from-beginning
>>>>>option.
>>>>>
>>>>>Thanks for any input!
>>>>>Garry
>>>>>
>>>>>-----Original Message-----
>>>>>From: Chris Riccomini [mailto:criccom...@linkedin.com]
>>>>>Sent: 10 February 2014 23:25
>>>>>To: dev@samza.incubator.apache.org
>>>>>Subject: Re: Using bootstrap streams
>>>>>
>>>>>Hey Garry,
>>>>>
>>>>>It sounds like your understanding of bootstrap streams is correct.
>>>>>
>>>>>Bootstrap stream messages will be delivered to the process() method
>>>>>just like any other. The only difference is you're supposed to get
>>>>>all of them from 0-lastOffset before you get any messages from
>>>>>non-bootstrap streams.
>>>>>Your positive/negative example sounds like a reasonable use case for
>>>>>a bootstrap stream.
>>>>>
>>>>>A few questions:
>>>>>
>>>>>1. Can you post the container logs and the full configuration file
>>>>>for your job somewhere (e.g. Github gist)?
>>>>>2. Are you putting data into the positive-words and negative-words
>>>>>topic before you start the Samza job?
>>>>>
>>>>>Also, you can do envelope.getSystemStreamPartition().getStream()
>>>>>directly (no need to call getSystemStream()).
>>>>>
>>>>>Cheers,
>>>>>Chris
>>>>>
>>>>>On 2/10/14 3:18 AM, "Garry Turkington"
>>>>><g.turking...@improvedigital.com>
>>>>>wrote:
>>>>>
>>>>>>Hi,
>>>>>>
>>>>>>I was building a task to do some sentiment analysis on incoming data.
>>>>>>I have a corpus each of positive and negative words to which the
>>>>>>task needs access. This seemed like a good fit for bootstrap
>>>>>>streams. But I can't seem to get them to work.
>>>>>>
>>>>>>I have my job configured with the 3 Kafka topics in task.inputs and
>>>>>>that seems to work, just throwing data at any of the topics is
>>>>>>hitting the task.
>>>>>>
>>>>>>But setting up the 2 reference streams as bootstrap doesn't seem to
>>>>>>be working. Here's the relevant part of the config, I want to read
>>>>>>the entire message history each time:
>>>>>>
>>>>>>systems.kafka.streams.positive-words.samza.bootstrap=true
>>>>>>systems.kafka.streams.positive-words.samza.reset.offset=true
>>>>>>
>>>>>>systems.kafka.streams.negative-words.samza.bootstrap=true
>>>>>>systems.kafka.streams.negative-words.samza.reset.offset=true
>>>>>>
>>>>>>Do bootstrap streams get handled in any special way, I'm assuming
>>>>>>here that the messages will arrive in the process method on
>>>>>>StreamTask just like any other and I can handle them differently by
>>>>>>switching on 
>>>>>>envelope.getSystemStreamPartition().getSystemStream().getStream().
>>>>>>Looking at the code it looks the same with the BootstrapChooser
>>>>>>doing its magic to determine which message is delivered to the task
>>>>>>but the actual delivery seems the same.
>>>>>>
>>>>>>What am I missing?
>>>>>>
>>>>>>Thanks,
>>>>>>Garry
>>>>>>
>>>>>
>>>>>
>>>>>-----
>>>>>No virus found in this message.
>>>>>Checked by AVG - www.avg.com
>>>>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date:
>>>>>02/10/14
>>>>
>>>>
>>>>-----
>>>>No virus found in this message.
>>>>Checked by AVG - www.avg.com
>>>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date:
>>>>02/10/14
>>>
>>
>
>
>-----
>No virus found in this message.
>Checked by AVG - www.avg.com
>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: 02/10/14

Reply via email to