Hey Jonathan,

Also, I noticed you have "serder" instead of "serde" for your msg serde
config 
(https://github.com/poltak/hello-samza/blob/no-freenode/samza-job-package/s
rc/main/config/medical-data-feed.properties). In both cases, this should
be:

systems.kafka.samza.key.serde=string

systems.kafka.samza.msg.serde=json


No 'r'. Didn't notice that before.

Cheers,
Chris

On 2/3/14 9:21 AM, "Chris Riccomini" <[email protected]> wrote:

>Hey Jonathan,
>
>Ah ah. This error is caused because you're sending a non-byte-array key in
>your StreamTask, and you don't have a key serializer defined. Samza, by
>default, allows you to send messages to a system without a serializer
>defined. If you do this, then it assumes you're giving it bytes.
>
>Here's are the configs you need:
>
>  
>serializers.registry.string.class=org.apache.samza.serializers.StringSerde
>F
>actory
>
>  systems.kafka.samza.key.serder=string
>
>
>This is causing no messages to be sent, which explains why you're unable
>to see any output from your job.
>
>We could definitely improve the error messaging here. Could you open a
>JIRA for this?
>
>Cheers,
>Chris
>
>On 1/31/14 4:24 PM, "Jonathan Poltak Samosir" <[email protected]>
>wrote:
>
>>Hi Chris,
>>
>>Oh dear, that's rather embarrassing, although it makes perfect sense now.
>>
>>Here is the first ~900 lines of the second container log (that warning
>>continues to repeat):
>>https://gist.github.com/poltak/8745919/raw/7990b0199fafde9bf6da00aa4b87b2
>>e
>>14fb0e717/gistfile1.txt
>>
>>There definitely seems to be an issue there. Unfortunately, I cannot work
>>out where exactly the problem lies since the String cast error message is
>>truncated. Do you know of any easy way to get the rest of this message?
>>
>>Thank you for your patience as well. It is very much appreciated!
>>
>>Jonathan
>>
>>
>>On 31 Jan 2014, at 11:20, Chris Riccomini <[email protected]>
>>wrote:
>>
>>> Hey Jonathan,
>>> 
>>> Could you send the logs for the actual container? The AM isn't actually
>>> running your code--it just manages and tells YARN to start a SECOND
>>> container to run your code. :)
>>> 
>>> For example, in the AM logs, you'll see:
>>> 
>>> 2014-01-30 18:36:46 SamzaAppMasterTaskManager [INFO] Claimed task ID 0
>>>for
>>> container container_1391135766129_0001_01_000002 on node s1
>>> 
>>>(http://s1:8042/node/containerlogs/container_1391135766129_0001_01_00000
>>>2
>>>).
>>> 
>>> This is the log path to the container that's running your code.
>>> 
>>> Alternatively, you can find the second container by clicking on the
>>> ApplicationMaster link in the YARN RM web UI, and then clicking on the
>>> link in the "running containers" section that ends with _000002.
>>> 
>>> If you could grab that log and send it here, I can take a look and help
>>> you figure out what's going on.
>>> 
>>> Cheers,
>>> Chris
>>> 
>>> On 1/30/14 7:23 PM, "Jonathan Poltak Samosir"
>>><[email protected]>
>>> wrote:
>>> 
>>>> Hi Chris,
>>>> 
>>>> Thanks for your thoughts.
>>>> 
>>>> 1. I will definitely put file reading on another thread, I just wanted
>>>>to
>>>> get it working for now with a very simple test file before I move on
>>>>and
>>>> re-implement my code to be more robust. Thanks for the pointers on
>>>> checking for space before calling BlockingEnvelopeMap.put(), also. I
>>>>will
>>>> definitely come back to that afterwards.
>>>> 
>>>> 2. Again, will definitely revisit this point later on as well as the
>>>>file
>>>> offset tip.
>>>> 
>>>> 3. Here are the logs:
>>>> application-master.log:
>>>> https://gist.github.com/poltak/8726030
>>>> 
>>>> gc.log (probably not needed, but it's there):
>>>> https://gist.github.com/poltak/8726004
>>>> 
>>>> stdout:
>>>> https://gist.github.com/poltak/8726036
>>>> 
>>>> stderr was clean.
>>>> 
>>>> Not sure if you will find anything useful out of those, but I'm sure
>>>>your
>>>> understanding of it will greatly outweigh mine.
>>>> 
>>>> Thanks,
>>>> Jonathan
>>>> 
>>>> 
>>>> On 30 Jan 2014, at 15:47, Chris Riccomini <[email protected]>
>>>>wrote:
>>>> 
>>>>> Hey Jonathan,
>>>>> 
>>>>> Another follow-on thought. You're currently using null for the
>>>>>offset,
>>>>> but
>>>>> you could very easily use filepos, and then fseek to the filepos when
>>>>>a
>>>>> SystemStreamPartition is instantiated. This would give you the
>>>>>ability
>>>>> to
>>>>> "pick up where you left off" rather than re-reading the whole file
>>>>>every
>>>>> time your SamzaContainer starts.
>>>>> 
>>>>> Cheers,
>>>>> Chris
>>>>> 
>>>>> On 1/30/14 3:43 PM, "Chris Riccomini" <[email protected]>
>>>>>wrote:
>>>>> 
>>>>>> Hey Jonathan,
>>>>>> 
>>>>>> I took a look at your code. Nothing looks horribly wrong at first
>>>>>> glance.
>>>>>> A couple of thoughts:
>>>>>> 
>>>>>> 1. You're reading the full file and loading it into the
>>>>>> BlockingEnvelopeMap in your start() method. This is OK if the file
>>>>>>is
>>>>>> small, and will ALWAYS be small. If the file is NOT small, you need
>>>>>>to
>>>>>> move the file reading to another thread, and only
>>>>>> BlockingEnvelopeMap.put() when there is space. The start/stop
>>>>>>methods
>>>>>> would then start and stop your reader thread. The "when there is
>>>>>>space"
>>>>>> behavior can be implemented in one of two ways: 1) check
>>>>>> BlockingEnvelopeMap.getNumMessagesInQueue before reading more lines,
>>>>>> and
>>>>>> block if it's above some threshold or 2) override
>>>>>> BlockingEnvelopeMap.newBlockingQueue
>>>>>> To use a bounded queue, which will automatically force your reader
>>>>>> thread
>>>>>> to block if the queue is full.
>>>>>> 2. An alternative input path style would be to just define the input
>>>>>> path
>>>>>> as the stream name. For example,
>>>>>> task.inputs=medicaldata./N/u/poltak/test.csv. You can then have the
>>>>>> MedicalDataConsumer.register method call
>>>>>> systemStreamPartition.getStream
>>>>>> to get the file path, and instantiate the file reader there. The
>>>>>> advantage
>>>>>> to this approach is it means you only need one MedicaDataConsumer to
>>>>>> read
>>>>>> from N number of files, rather than having one MedicalDataConsumer
>>>>>>(and
>>>>>> system) per file.
>>>>>> 3. Can you send the output log when running your job?
>>>>>> 
>>>>>> 
>>>>>> Cheers,
>>>>>> Chris
>>>>>> 
>>>>>> On 1/30/14 2:56 PM, "Jonathan Poltak Samosir"
>>>>>> <[email protected]>
>>>>>> wrote:
>>>>>> 
>>>>>>> Thanks for the help, Chris!
>>>>>>> 
>>>>>>> The Javadocs you updated certainly did help my understanding of the
>>>>>>> SystemConsumer interface, although I still have not been able to
>>>>>>>get
>>>>>>> my
>>>>>>> System to work.
>>>>>>> 
>>>>>>> I have now resorted to trying to extend the BlockingEnvelopeMap
>>>>>>> implementation of SystemConsumer, as it seemed a lot simpler for
>>>>>>>what
>>>>>>> I
>>>>>>> want to do plus the hello-samza example uses it so I can use it as
>>>>>>>a
>>>>>>> reference example.
>>>>>>> 
>>>>>>> Even simply putting a call to 'put()' at the beginning of the
>>>>>>> 'start()'
>>>>>>> method with a debug message, nothing is received by my simple
>>>>>>> StreamTask
>>>>>>> (which simply forwards what is received from my System's stream
>>>>>>>onto a
>>>>>>> Kafka stream for now). As this is the case, I think there is a flaw
>>>>>>> in my
>>>>>>> understanding of something much more fundamental here relating to
>>>>>>> Samza
>>>>>>> System... I am sure there is something very simple that I am
>>>>>>>missing
>>>>>>> with
>>>>>>> my implementation, as I have based it directly off the
>>>>>>> WikipediaSystem of
>>>>>>> which I feel I have a pretty thorough understanding of now.
>>>>>>> 
>>>>>>> If anyone has time to look at my implementations for
>>>>>>>SystemConsumer,
>>>>>>> SystemFactory and the previously mentioned StreamTask's config file
>>>>>>> (they're all very simple and "too-the-point", since I'm just trying
>>>>>>>to
>>>>>>> get things working), it would be very much appreciated.
>>>>>>> 
>>>>>>> SystemFactory implementation:
>>>>>>> 
>>>>>>> 
>>>>>>>https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikiped
>>>>>>>i
>>>>>>>a/
>>>>>>> sr
>>>>>>> c
>>>>>>> 
>>>>>>> 
>>>>>>>/main/java/samza/examples/wikipedia/system/MedicalDataSystemFactory.
>>>>>>>j
>>>>>>>av
>>>>>>> a
>>>>>>> 
>>>>>>> SystemConsumer implementation:
>>>>>>> 
>>>>>>> 
>>>>>>>https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikiped
>>>>>>>i
>>>>>>>a/
>>>>>>> sr
>>>>>>> c
>>>>>>> /main/java/samza/examples/wikipedia/system/MedicalDataConsumer.java
>>>>>>> 
>>>>>>> StreamTask config (which specifies input from my System):
>>>>>>> 
>>>>>>> 
>>>>>>>https://github.com/poltak/hello-samza/blob/no-freenode/samza-job-pac
>>>>>>>k
>>>>>>>ag
>>>>>>> e/
>>>>>>> s
>>>>>>> rc/main/config/medical-data-feed.properties
>>>>>>> 
>>>>>>> And yes, once I get it working and reading a file into a Samza
>>>>>>> Stream, I
>>>>>>> will be happy to submit a patch.
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Jonathan
>>>>>>> 
>>>>>>> 
>>>>>>> On 30 Jan 2014, at 10:26, Chris Riccomini <[email protected]>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hey Jonathan,
>>>>>>>> 
>>>>>>>> I've attempted to answer your questions by updating the Javadocs.
>>>>>>>>:)
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>>http://samza.incubator.apache.org/learn/documentation/0.7.0/api/jav
>>>>>>>>a
>>>>>>>>do
>>>>>>>> cs
>>>>>>>> /
>>>>>>>> or
>>>>>>>> g/apache/samza/system/SystemConsumer.html
>>>>>>>> 
>>>>>>>> Let me know if anything doesn't make sense.
>>>>>>>> 
>>>>>>>> Also, it'd be awesome if you could contribute your file reader
>>>>>>>> SystemConsumer. This seems like it'd be really useful for a lot of
>>>>>>>> things.
>>>>>>>> Could you open a JIRA and submit a patch when you're ready?
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Chris
>>>>>>>> 
>>>>>>>> On 1/29/14 11:14 PM, "Jonathan Poltak Samosir"
>>>>>>>> <[email protected]> wrote:
>>>>>>>> 
>>>>>>>>> Hello,
>>>>>>>>> 
>>>>>>>>> Currently trying to write a simple Samza System that reads from a
>>>>>>>>> file
>>>>>>>>> and puts the contents onto a Samza-compatible stream. Been basing
>>>>>>>>>it
>>>>>>>>> off
>>>>>>>>> the hello-samza WikipediaSystem example so far. The SystemFactory
>>>>>>>>> implementation seemed to be pretty straightforward (get path to
>>>>>>>>>file
>>>>>>>>> from
>>>>>>>>> config and return a SystemConsumer that reads the file at that
>>>>>>>>> path),
>>>>>>>>> although I am not 100% sure of what the purpose of all the
>>>>>>>>> SystemConsumer
>>>>>>>>> interface methods are.
>>>>>>>>> 
>>>>>>>>> start() and stop() seem fairly self-explanatory, getting called
>>>>>>>>>at
>>>>>>>>> the
>>>>>>>>> when the System is started and stopped, respectively (please let
>>>>>>>>>me
>>>>>>>>> know
>>>>>>>>> if I am wrong about any of my understandings). register() seems
>>>>>>>>>to
>>>>>>>>> be
>>>>>>>>> similar to start() in that it will be called near the beginning
>>>>>>>>>of
>>>>>>>>> the
>>>>>>>>> System, although giving access to the SystemStreamPartition and a
>>>>>>>>> given
>>>>>>>>> partition offset, correct? The poll() method seems to be where
>>>>>>>>>most
>>>>>>>>> of
>>>>>>>>> the action takes place, and going by its name, it is called often
>>>>>>>>>or
>>>>>>>>> at
>>>>>>>>> specified intervals? If so, how does this polling work and how is
>>>>>>>>> the
>>>>>>>>> interval specified?
>>>>>>>>> Also, the List of IncomingMessageEnvelopes that get returned from
>>>>>>>>> poll():
>>>>>>>>> these are then forwarded on to their specified
>>>>>>>>> SystemStreamPartitions
>>>>>>>>> (first arg in the IncomingMessageEnvelope constructor)?
>>>>>>>>> 
>>>>>>>>> Anyway, thanks for your time and let me know how far off I am
>>>>>>>>>with
>>>>>>>>> my
>>>>>>>>> understanding of this (as I can't get anything working with my
>>>>>>>>> current
>>>>>>>>> system implementation). Will be happy to contribute back Javadoc
>>>>>>>>> patch
>>>>>>>>> reflecting my amended understanding of this interface afterwards.
>>>>>>>>> 
>>>>>>>>> Jonathan
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>>
>

Reply via email to