Hey Jonathan, Another follow-on thought. You're currently using null for the offset, but you could very easily use filepos, and then fseek to the filepos when a SystemStreamPartition is instantiated. This would give you the ability to "pick up where you left off" rather than re-reading the whole file every time your SamzaContainer starts.
Cheers, Chris On 1/30/14 3:43 PM, "Chris Riccomini" <[email protected]> wrote: >Hey Jonathan, > >I took a look at your code. Nothing looks horribly wrong at first glance. >A couple of thoughts: > >1. You're reading the full file and loading it into the >BlockingEnvelopeMap in your start() method. This is OK if the file is >small, and will ALWAYS be small. If the file is NOT small, you need to >move the file reading to another thread, and only >BlockingEnvelopeMap.put() when there is space. The start/stop methods >would then start and stop your reader thread. The "when there is space" >behavior can be implemented in one of two ways: 1) check >BlockingEnvelopeMap.getNumMessagesInQueue before reading more lines, and >block if it's above some threshold or 2) override >BlockingEnvelopeMap.newBlockingQueue > To use a bounded queue, which will automatically force your reader thread >to block if the queue is full. >2. An alternative input path style would be to just define the input path >as the stream name. For example, >task.inputs=medicaldata./N/u/poltak/test.csv. You can then have the >MedicalDataConsumer.register method call systemStreamPartition.getStream >to get the file path, and instantiate the file reader there. The advantage >to this approach is it means you only need one MedicaDataConsumer to read >from N number of files, rather than having one MedicalDataConsumer (and >system) per file. >3. Can you send the output log when running your job? > > >Cheers, >Chris > >On 1/30/14 2:56 PM, "Jonathan Poltak Samosir" <[email protected]> >wrote: > >>Thanks for the help, Chris! >> >>The Javadocs you updated certainly did help my understanding of the >>SystemConsumer interface, although I still have not been able to get my >>System to work. >> >>I have now resorted to trying to extend the BlockingEnvelopeMap >>implementation of SystemConsumer, as it seemed a lot simpler for what I >>want to do plus the hello-samza example uses it so I can use it as a >>reference example. >> >>Even simply putting a call to 'put()' at the beginning of the 'start()' >>method with a debug message, nothing is received by my simple StreamTask >>(which simply forwards what is received from my System's stream onto a >>Kafka stream for now). As this is the case, I think there is a flaw in my >>understanding of something much more fundamental here relating to Samza >>System... I am sure there is something very simple that I am missing with >>my implementation, as I have based it directly off the WikipediaSystem of >>which I feel I have a pretty thorough understanding of now. >> >>If anyone has time to look at my implementations for SystemConsumer, >>SystemFactory and the previously mentioned StreamTask's config file >>(they're all very simple and "too-the-point", since I'm just trying to >>get things working), it would be very much appreciated. >> >>SystemFactory implementation: >>https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikipedia/sr >>c >>/main/java/samza/examples/wikipedia/system/MedicalDataSystemFactory.java >> >>SystemConsumer implementation: >>https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikipedia/sr >>c >>/main/java/samza/examples/wikipedia/system/MedicalDataConsumer.java >> >>StreamTask config (which specifies input from my System): >>https://github.com/poltak/hello-samza/blob/no-freenode/samza-job-package/ >>s >>rc/main/config/medical-data-feed.properties >> >>And yes, once I get it working and reading a file into a Samza Stream, I >>will be happy to submit a patch. >> >>Thanks, >>Jonathan >> >> >>On 30 Jan 2014, at 10:26, Chris Riccomini <[email protected]> >>wrote: >> >>> Hey Jonathan, >>> >>> I've attempted to answer your questions by updating the Javadocs. :) >>> >>> >>> >>>http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs >>>/ >>>or >>> g/apache/samza/system/SystemConsumer.html >>> >>> Let me know if anything doesn't make sense. >>> >>> Also, it'd be awesome if you could contribute your file reader >>> SystemConsumer. This seems like it'd be really useful for a lot of >>>things. >>> Could you open a JIRA and submit a patch when you're ready? >>> >>> Cheers, >>> Chris >>> >>> On 1/29/14 11:14 PM, "Jonathan Poltak Samosir" >>> <[email protected]> wrote: >>> >>>> Hello, >>>> >>>> Currently trying to write a simple Samza System that reads from a file >>>> and puts the contents onto a Samza-compatible stream. Been basing it >>>>off >>>> the hello-samza WikipediaSystem example so far. The SystemFactory >>>> implementation seemed to be pretty straightforward (get path to file >>>>from >>>> config and return a SystemConsumer that reads the file at that path), >>>> although I am not 100% sure of what the purpose of all the >>>>SystemConsumer >>>> interface methods are. >>>> >>>> start() and stop() seem fairly self-explanatory, getting called at the >>>> when the System is started and stopped, respectively (please let me >>>>know >>>> if I am wrong about any of my understandings). register() seems to be >>>> similar to start() in that it will be called near the beginning of the >>>> System, although giving access to the SystemStreamPartition and a >>>>given >>>> partition offset, correct? The poll() method seems to be where most of >>>> the action takes place, and going by its name, it is called often or >>>>at >>>> specified intervals? If so, how does this polling work and how is the >>>> interval specified? >>>> Also, the List of IncomingMessageEnvelopes that get returned from >>>>poll(): >>>> these are then forwarded on to their specified SystemStreamPartitions >>>> (first arg in the IncomingMessageEnvelope constructor)? >>>> >>>> Anyway, thanks for your time and let me know how far off I am with my >>>> understanding of this (as I can't get anything working with my current >>>> system implementation). Will be happy to contribute back Javadoc patch >>>> reflecting my amended understanding of this interface afterwards. >>>> >>>> Jonathan >>> >> >
