Hey Jonathan,

Another follow-on thought. You're currently using null for the offset, but
you could very easily use filepos, and then fseek to the filepos when a
SystemStreamPartition is instantiated. This would give you the ability to
"pick up where you left off" rather than re-reading the whole file every
time your SamzaContainer starts.

Cheers,
Chris

On 1/30/14 3:43 PM, "Chris Riccomini" <[email protected]> wrote:

>Hey Jonathan,
>
>I took a look at your code. Nothing looks horribly wrong at first glance.
>A couple of thoughts:
>
>1. You're reading the full file and loading it into the
>BlockingEnvelopeMap in your start() method. This is OK if the file is
>small, and will ALWAYS be small. If the file is NOT small, you need to
>move the file reading to another thread, and only
>BlockingEnvelopeMap.put() when there is space. The start/stop methods
>would then start and stop your reader thread. The "when there is space"
>behavior can be implemented in one of two ways: 1) check
>BlockingEnvelopeMap.getNumMessagesInQueue before reading more lines, and
>block if it's above some threshold or 2) override
>BlockingEnvelopeMap.newBlockingQueue
> To use a bounded queue, which will automatically force your reader thread
>to block if the queue is full.
>2. An alternative input path style would be to just define the input path
>as the stream name. For example,
>task.inputs=medicaldata./N/u/poltak/test.csv. You can then have the
>MedicalDataConsumer.register method call systemStreamPartition.getStream
>to get the file path, and instantiate the file reader there. The advantage
>to this approach is it means you only need one MedicaDataConsumer to read
>from N number of files, rather than having one MedicalDataConsumer (and
>system) per file.
>3. Can you send the output log when running your job?
>
>
>Cheers,
>Chris
>
>On 1/30/14 2:56 PM, "Jonathan Poltak Samosir" <[email protected]>
>wrote:
>
>>Thanks for the help, Chris!
>>
>>The Javadocs you updated certainly did help my understanding of the
>>SystemConsumer interface, although I still have not been able to get my
>>System to work.
>>
>>I have now resorted to trying to extend the BlockingEnvelopeMap
>>implementation of SystemConsumer, as it seemed a lot simpler for what I
>>want to do plus the hello-samza example uses it so I can use it as a
>>reference example.
>>
>>Even simply putting a call to 'put()' at the beginning of the 'start()'
>>method with a debug message, nothing is received by my simple StreamTask
>>(which simply forwards what is received from my System's stream onto a
>>Kafka stream for now). As this is the case, I think there is a flaw in my
>>understanding of something much more fundamental here relating to Samza
>>System... I am sure there is something very simple that I am missing with
>>my implementation, as I have based it directly off the WikipediaSystem of
>>which I feel I have a pretty thorough understanding of now.
>>
>>If anyone has time to look at my implementations for SystemConsumer,
>>SystemFactory and the previously mentioned StreamTask's config file
>>(they're all very simple and "too-the-point", since I'm just trying to
>>get things working), it would be very much appreciated.
>>
>>SystemFactory implementation:
>>https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikipedia/sr
>>c
>>/main/java/samza/examples/wikipedia/system/MedicalDataSystemFactory.java
>>
>>SystemConsumer implementation:
>>https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikipedia/sr
>>c
>>/main/java/samza/examples/wikipedia/system/MedicalDataConsumer.java
>>
>>StreamTask config (which specifies input from my System):
>>https://github.com/poltak/hello-samza/blob/no-freenode/samza-job-package/
>>s
>>rc/main/config/medical-data-feed.properties
>>
>>And yes, once I get it working and reading a file into a Samza Stream, I
>>will be happy to submit a patch.
>>
>>Thanks,
>>Jonathan
>>
>>
>>On 30 Jan 2014, at 10:26, Chris Riccomini <[email protected]>
>>wrote:
>>
>>> Hey Jonathan,
>>> 
>>> I've attempted to answer your questions by updating the Javadocs. :)
>>> 
>>> 
>>> 
>>>http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs
>>>/
>>>or
>>> g/apache/samza/system/SystemConsumer.html
>>> 
>>> Let me know if anything doesn't make sense.
>>> 
>>> Also, it'd be awesome if you could contribute your file reader
>>> SystemConsumer. This seems like it'd be really useful for a lot of
>>>things.
>>> Could you open a JIRA and submit a patch when you're ready?
>>> 
>>> Cheers,
>>> Chris
>>> 
>>> On 1/29/14 11:14 PM, "Jonathan Poltak Samosir"
>>> <[email protected]> wrote:
>>> 
>>>> Hello,
>>>> 
>>>> Currently trying to write a simple Samza System that reads from a file
>>>> and puts the contents onto a Samza-compatible stream. Been basing it
>>>>off
>>>> the hello-samza WikipediaSystem example so far. The SystemFactory
>>>> implementation seemed to be pretty straightforward (get path to file
>>>>from
>>>> config and return a SystemConsumer that reads the file at that path),
>>>> although I am not 100% sure of what the purpose of all the
>>>>SystemConsumer
>>>> interface methods are.
>>>> 
>>>> start() and stop() seem fairly self-explanatory, getting called at the
>>>> when the System is started and stopped, respectively (please let me
>>>>know
>>>> if I am wrong about any of my understandings). register() seems to be
>>>> similar to start() in that it will be called near the beginning of the
>>>> System, although giving access to the SystemStreamPartition and a
>>>>given
>>>> partition offset, correct? The poll() method seems to be where most of
>>>> the action takes place, and going by its name, it is called often or
>>>>at
>>>> specified intervals? If so, how does this polling work and how is the
>>>> interval specified?
>>>> Also, the List of IncomingMessageEnvelopes that get returned from
>>>>poll():
>>>> these are then forwarded on to their specified SystemStreamPartitions
>>>> (first arg in the IncomingMessageEnvelope constructor)?
>>>> 
>>>> Anyway, thanks for your time and let me know how far off I am with my
>>>> understanding of this (as I can't get anything working with my current
>>>> system implementation). Will be happy to contribute back Javadoc patch
>>>> reflecting my amended understanding of this interface afterwards.
>>>> 
>>>> Jonathan
>>> 
>>
>

Reply via email to