Hi all,

After working with the Streams project a bit, I have noticed some of the
same issues that Matt and Ryan have brought up. I think that Matt's idea
to implement two interfaces (Producer, Listener) would make a great
addition to the project. Not only would it increase efficiency but it
would also, in my opinion, make the streams themselves easier to construct
and understand.

-- Robert

On 5/7/14, 1:41 PM, "Matthew Hager [W2O Digital]" <mha...@w2odigital.com>
wrote:

>Good Day!
>
>I would like to throw in my two pents in on this if it pleases the
>community.
>
>Here are my thoughts based on implementations that I have written with
>streams to ensure timely, high yield execution. Personally, I had to
>override much of the LocalStreamsBuilder to fit my use cases for many of
>the problems described below, except the opposite of which. I have a
>modality of a 'finite' stream which execution is hindered when being
>'polled' in the manner that it is. This is further complicated by the
>excessive waiting caused by the current 'shutdown' the exists.
>
>There are essentially two major use-cases, that I can see, that are likely
>to take place. The first is a perpetual stream, that is technically never
>satisfied. The second, is the case of a finite stream (HDFS reader, S3
>reader, pulling a user's time-line, etc...) that has a definitive start
>and end. To solve these two models of execution here are my thoughts.
>
>StreamsResultSet - I actually found this to be quite useful paradigm. A
>queue prevents a buffer overflow, an iterator makes it fun and easy to
>read (I love iterators), and it is simple and succinct. I do, however,
>feel it is best expressed as an interface instead of a class. Personally I
>had to override almost every function to fit the concept of a 'finite'
>stream. Without an expensive tear-down cost. The thing missing from this,
>as an interface, would be the notion of "isRunning" which could easily
>satisfy both of the aforementioned modalities. (As Ryan suggested) I
>actually have a reference implementation of this for finite streams if
>anyone would like to see it or use it.
>
>Event Driven - I concur with Matt 100% on this. As currently implemented,
>LocalStreamsBuilder is exceedingly inefficient from a memory perspective
>and time execution perspective. To me, it seems, that we could almost
>abstract out 2 common interfaces to make this happen.
>
>       * Listener { receive(StreamsDatum); }
>       * Producer { push(StreamsDatum); registerListener(Listener); }
>
>Where the following implementations would place:
>
>       * Reader implements Producer
>       * Processor implements Producer, Listener
>       * Writer implements Listener
>
>In the reference implementations, you can still have queues that are in
>place that could actually function as meaningful indicators of system
>performance and status. IE: the queue functions as, well, an actually
>queue, and processes are much more asynchronous than they currently are
>now. Then, LocalStreamsBuilder strings all the guys up together in their
>nice little workflows and the events just shoot the little Datums down
>their paths until they wind up wherever they are supposed to go as quickly
>as possible.
>
>Pardon the long response, I tend to be wordy, great discussion and thanks
>to everyone for indulging my thoughts!
>
>
>Cheers!
>Smashew (Matthew Hager)
>
>
>
>Matthew Hager
>Director - Data Sciences Software
>
>W2O Digital
>3000
>E Cesar Chavez St., Suite 300, Austin, Texas 78702
>direct 512.551.0891 | cell 512.949.9603
>twitter iSmashew 
><http://cp.mcafee.com/d/5fHCN0pdEICzAQsLnpjpodTdFEIzDxRQTxNJd5x5Z5dB4srjhp
>7f3HFLf6QrEzxPUV6XVKa5mO9-Q1hxeG4ycFWvOVIMDl2h6kZfVsSCUwMWUO_R-svhuKPRXBQS
>hPD8ETv7czKmKDp55mWavaxVZicHs3jq9JcTvAXTLuZXTKrKr01PciDfUYLAGaXgDVz3q7CiYv
>CT61ssesbNgGShfSxNxeG4ycFWvOUaFefWHjFgISgStoZGSS9_M04SyyYeodwLQzh05ERmHik2
>9Ew4yuM8_gQgjGq89A_d40NefWHgbhGpAxYjh1a4_yXJLd46Mgd40NefWHgbhGpAxYgjJ2FIsY
>rVGx8qNRO> | linkedin Matthew Hager
><http://cp.mcafee.com/d/FZsSd6QmjhOqenHIFII6XCQQmhPMWWrMUSCyMy-yCOyedFEIzD
>xRQTDzqdQhMVYsztYT52Hp4_q0EMDl2h6kZfVsSojGx8zauDYKrjsgotspvW_efELnpWZOWr8V
>PAkrLzChTbnjIyyHt5fBgY-F6lK1FJcSCrLOtXTLuZXTdTdw0zVga-xa7bUJ6HIz_MPbP1ai1P
>NEVovpd78USxVAL7VJNwn73D2YkaJAjZEsojGx8zauDYK2Gjz-GQWkbdAdDmfqJJyvY01dEEL3
>C3obZ8Qg1qdlGQB0yq818DI2fQd44WCy2pfPh0cjz-GQ2QqCp8v4QgixfUKXrPh1I43h0cjz-G
>Q2QqCp8v44XgGr7f6_558nD-1>
>ŠŠŠŠŠŠŠŠŠŠŠŠŠŠŠŠ
>
>
>
>
>On 5/6/14, 10:58 AM, "Steve Blackmon" <sblack...@apache.org> wrote:
>
>>On Tue, May 6, 2014 at 8:24 AM, Matt Franklin <m.ben.frank...@gmail.com>
>>wrote:
>>> On Mon, May 5, 2014 at 1:15 PM, Steve Blackmon <sblack...@apache.org>
>>>wrote:
>>>
>>>> What I meant to say re #1 below is that batch-level metadata could be
>>>> useful for modules downstream of the StreamsProvider /
>>>> StreamsPersistReader, and the StreamsResultSet gives us a class to
>>>> which we can add new metadata in core as the project evolves, or
>>>> supplement on a per-module or per-implementation basis via
>>>> subclassing.  Within a provider there's no need to modify or extend
>>>> StreamsResultSet to maintain and utilize state from a third-party API.
>>>>
>>>
>>> I agree that in batch mode, metadata might be important.  In
>>>conversations
>>> with other people, I think what might be missing is a completely
>>>reactive,
>>> event-driven mode where a provider pushes to the rest of the stream
>>>rather
>>> than gets polled.
>>>
>>
>>That would certainly be nice, but I see it as primarily a run-time
>>concern.  We should add additional methods to the core interfaces if
>>we need them to make a push run-time (backed by camel, nsq, activemq,
>>0mq, etc...) work, but let's stay vigilant to keep the number of
>>methods on those interfaces to a minimum so we don't end up with a)
>>classes that do a lot of stuff in core b) an effective partition
>>between methods necessary for perpetual and batch modes c) lots of
>>modules that implement just one or the other.  Modules that don't
>>implement all run-modes is already a problem.
>>
>>So who wants to volunteer to write a push-based run-time module?
>>
>>>
>>>>
>>>> I think I would support making StreamsResultSet an interface rather
>>>> than a class.
>>>>
>>>
>>> +1 on interface
>>>
>>>
>>>>
>>>> Steve Blackmon
>>>> sblack...@apache.org
>>>>
>>>> On Mon, May 5, 2014 at 12:07 PM, Steve Blackmon <st...@blackmon.org>
>>>> wrote:
>>>> > Comments on this in-line below.
>>>> >
>>>> > On Thu, May 1, 2014 at 4:38 PM, Ryan Ebanks <ryaneba...@gmail.com>
>>>> wrote:
>>>> >> The use and implementations of the StreamsProviders seems to have
>>>> drifted
>>>> >> away from what it was originally designed for.  I recommend that we
>>>> change
>>>> >> the StreamsProvider interface and StreamsProvider task to reflect
>>>>the
>>>> >> current usage patterns and to be more efficient.
>>>> >>
>>>> >> Current Problems:
>>>> >>
>>>> >> 1.) newPerpetualStream in LocalStream builder is not perpetual.
>>>>The
>>>> >> StreamProvider task will shut down after a certain amount of empty
>>>> returns
>>>> >> from the provider.  A perpetual stream implies that it will run in
>>>> >> perpetuity.  If I open a Twitter Gardenhose that is returning
>>>>tweets
>>>> with
>>>> >> obscure key words, I don't want my stream shutting down if it is
>>>>just
>>>> quiet
>>>> >> for a few time periods.
>>>> >>
>>>> >> 2.) StreamsProviderTasks assumes that a single read*, will return
>>>>all
>>>> the
>>>> >> data for that request.  This means that if I do a readRange for a
>>>>year,
>>>> the
>>>> >> provider has to hold all of that data in memory and return it as
>>>>one
>>>> >> StreamsResultSet.  I believe the readPerpetual was designed to get
>>>> around
>>>> >> this problem.
>>>> >>
>>>> >> Proposed Fixes/Changes:
>>>> >>
>>>> >> Fix 1.) Remove the StreamsResultSet.  No implementations in the
>>>>project
>>>> >> currently use it for anything other than a wrapper around a Queue
>>>>that
>>>> is
>>>> >> then iterated over.  StreamsProvider will now return a
>>>> Queue<StreamsDatum>
>>>> >> instead of a StreamsResultSet.  This will allow providers to queue
>>>>data
>>>> as
>>>> >> they receive it, and the StreamsProviderTask can pop them off as
>>>>soon as
>>>> >> they are available.  It will help fix problem #2, as well as help
>>>>to
>>>> lower
>>>> >> memory usage.
>>>> >>
>>>> >
>>>> > I'm not convinced this is a good idea.  StreamsResultSet is a useful
>>>> > abstraction even if no modules are using it as more than a wrapper
>>>>for
>>>> > Queue at the moment.  For example read* in a provider or
>>>>persistReader
>>>> > could return batch-level (as opposed to datum-level) metadata from
>>>>the
>>>> > underlying API which would be useful state for the provider.
>>>> > Switching to Queue would eliminate our ability to add those
>>>> > capabilities at the core level or at the module level.
>>>> >
>>>> >> Fix 2.) Add a method, public boolean isRunning(), to the
>>>>StreamsProvider
>>>> >> interface.  The StreamsProviderTask can call this function to see
>>>>if the
>>>> >> provider is still operating. This will help fix problems #1 and #2.
>>>>This
>>>> >> will allow the provider to run mulitthreaded, queue data as it's
>>>> available,
>>>> >> and notify the task when it's done so that it can be closed down
>>>> properly.
>>>> >>  It will also allow the stream to be run in perpetuity as the
>>>>StreamTask
>>>> >> won't shut down providers that have not been producing data for a
>>>>while.
>>>> >>
>>>> >
>>>> > I think this is a good idea.  +1
>>>> >
>>>> >> Right now the StreamsProvider and StreamsProviderTask seem to be
>>>>full of
>>>> >> short term fixes that need to be redesigned into long term
>>>>solutions.
>>>>  With
>>>> >> enough positive feedback, I will create Jira tasks, a feature
>>>>branch,
>>>> and
>>>> >> begin work.
>>>> >>
>>>> >> Sincerely,
>>>> >> Ryan Ebanks
>>>>
>

Reply via email to