Hi all, After working with the Streams project a bit, I have noticed some of the same issues that Matt and Ryan have brought up. I think that Matt's idea to implement two interfaces (Producer, Listener) would make a great addition to the project. Not only would it increase efficiency but it would also, in my opinion, make the streams themselves easier to construct and understand.
-- Robert On 5/7/14, 1:41 PM, "Matthew Hager [W2O Digital]" <mha...@w2odigital.com> wrote: >Good Day! > >I would like to throw in my two pents in on this if it pleases the >community. > >Here are my thoughts based on implementations that I have written with >streams to ensure timely, high yield execution. Personally, I had to >override much of the LocalStreamsBuilder to fit my use cases for many of >the problems described below, except the opposite of which. I have a >modality of a 'finite' stream which execution is hindered when being >'polled' in the manner that it is. This is further complicated by the >excessive waiting caused by the current 'shutdown' the exists. > >There are essentially two major use-cases, that I can see, that are likely >to take place. The first is a perpetual stream, that is technically never >satisfied. The second, is the case of a finite stream (HDFS reader, S3 >reader, pulling a user's time-line, etc...) that has a definitive start >and end. To solve these two models of execution here are my thoughts. > >StreamsResultSet - I actually found this to be quite useful paradigm. A >queue prevents a buffer overflow, an iterator makes it fun and easy to >read (I love iterators), and it is simple and succinct. I do, however, >feel it is best expressed as an interface instead of a class. Personally I >had to override almost every function to fit the concept of a 'finite' >stream. Without an expensive tear-down cost. The thing missing from this, >as an interface, would be the notion of "isRunning" which could easily >satisfy both of the aforementioned modalities. (As Ryan suggested) I >actually have a reference implementation of this for finite streams if >anyone would like to see it or use it. > >Event Driven - I concur with Matt 100% on this. As currently implemented, >LocalStreamsBuilder is exceedingly inefficient from a memory perspective >and time execution perspective. To me, it seems, that we could almost >abstract out 2 common interfaces to make this happen. > > * Listener { receive(StreamsDatum); } > * Producer { push(StreamsDatum); registerListener(Listener); } > >Where the following implementations would place: > > * Reader implements Producer > * Processor implements Producer, Listener > * Writer implements Listener > >In the reference implementations, you can still have queues that are in >place that could actually function as meaningful indicators of system >performance and status. IE: the queue functions as, well, an actually >queue, and processes are much more asynchronous than they currently are >now. Then, LocalStreamsBuilder strings all the guys up together in their >nice little workflows and the events just shoot the little Datums down >their paths until they wind up wherever they are supposed to go as quickly >as possible. > >Pardon the long response, I tend to be wordy, great discussion and thanks >to everyone for indulging my thoughts! > > >Cheers! >Smashew (Matthew Hager) > > > >Matthew Hager >Director - Data Sciences Software > >W2O Digital >3000 >E Cesar Chavez St., Suite 300, Austin, Texas 78702 >direct 512.551.0891 | cell 512.949.9603 >twitter iSmashew ><http://cp.mcafee.com/d/5fHCN0pdEICzAQsLnpjpodTdFEIzDxRQTxNJd5x5Z5dB4srjhp >7f3HFLf6QrEzxPUV6XVKa5mO9-Q1hxeG4ycFWvOVIMDl2h6kZfVsSCUwMWUO_R-svhuKPRXBQS >hPD8ETv7czKmKDp55mWavaxVZicHs3jq9JcTvAXTLuZXTKrKr01PciDfUYLAGaXgDVz3q7CiYv >CT61ssesbNgGShfSxNxeG4ycFWvOUaFefWHjFgISgStoZGSS9_M04SyyYeodwLQzh05ERmHik2 >9Ew4yuM8_gQgjGq89A_d40NefWHgbhGpAxYjh1a4_yXJLd46Mgd40NefWHgbhGpAxYgjJ2FIsY >rVGx8qNRO> | linkedin Matthew Hager ><http://cp.mcafee.com/d/FZsSd6QmjhOqenHIFII6XCQQmhPMWWrMUSCyMy-yCOyedFEIzD >xRQTDzqdQhMVYsztYT52Hp4_q0EMDl2h6kZfVsSojGx8zauDYKrjsgotspvW_efELnpWZOWr8V >PAkrLzChTbnjIyyHt5fBgY-F6lK1FJcSCrLOtXTLuZXTdTdw0zVga-xa7bUJ6HIz_MPbP1ai1P >NEVovpd78USxVAL7VJNwn73D2YkaJAjZEsojGx8zauDYK2Gjz-GQWkbdAdDmfqJJyvY01dEEL3 >C3obZ8Qg1qdlGQB0yq818DI2fQd44WCy2pfPh0cjz-GQ2QqCp8v4QgixfUKXrPh1I43h0cjz-G >Q2QqCp8v44XgGr7f6_558nD-1> >ŠŠŠŠŠŠŠŠŠŠŠŠŠŠŠŠ > > > > >On 5/6/14, 10:58 AM, "Steve Blackmon" <sblack...@apache.org> wrote: > >>On Tue, May 6, 2014 at 8:24 AM, Matt Franklin <m.ben.frank...@gmail.com> >>wrote: >>> On Mon, May 5, 2014 at 1:15 PM, Steve Blackmon <sblack...@apache.org> >>>wrote: >>> >>>> What I meant to say re #1 below is that batch-level metadata could be >>>> useful for modules downstream of the StreamsProvider / >>>> StreamsPersistReader, and the StreamsResultSet gives us a class to >>>> which we can add new metadata in core as the project evolves, or >>>> supplement on a per-module or per-implementation basis via >>>> subclassing. Within a provider there's no need to modify or extend >>>> StreamsResultSet to maintain and utilize state from a third-party API. >>>> >>> >>> I agree that in batch mode, metadata might be important. In >>>conversations >>> with other people, I think what might be missing is a completely >>>reactive, >>> event-driven mode where a provider pushes to the rest of the stream >>>rather >>> than gets polled. >>> >> >>That would certainly be nice, but I see it as primarily a run-time >>concern. We should add additional methods to the core interfaces if >>we need them to make a push run-time (backed by camel, nsq, activemq, >>0mq, etc...) work, but let's stay vigilant to keep the number of >>methods on those interfaces to a minimum so we don't end up with a) >>classes that do a lot of stuff in core b) an effective partition >>between methods necessary for perpetual and batch modes c) lots of >>modules that implement just one or the other. Modules that don't >>implement all run-modes is already a problem. >> >>So who wants to volunteer to write a push-based run-time module? >> >>> >>>> >>>> I think I would support making StreamsResultSet an interface rather >>>> than a class. >>>> >>> >>> +1 on interface >>> >>> >>>> >>>> Steve Blackmon >>>> sblack...@apache.org >>>> >>>> On Mon, May 5, 2014 at 12:07 PM, Steve Blackmon <st...@blackmon.org> >>>> wrote: >>>> > Comments on this in-line below. >>>> > >>>> > On Thu, May 1, 2014 at 4:38 PM, Ryan Ebanks <ryaneba...@gmail.com> >>>> wrote: >>>> >> The use and implementations of the StreamsProviders seems to have >>>> drifted >>>> >> away from what it was originally designed for. I recommend that we >>>> change >>>> >> the StreamsProvider interface and StreamsProvider task to reflect >>>>the >>>> >> current usage patterns and to be more efficient. >>>> >> >>>> >> Current Problems: >>>> >> >>>> >> 1.) newPerpetualStream in LocalStream builder is not perpetual. >>>>The >>>> >> StreamProvider task will shut down after a certain amount of empty >>>> returns >>>> >> from the provider. A perpetual stream implies that it will run in >>>> >> perpetuity. If I open a Twitter Gardenhose that is returning >>>>tweets >>>> with >>>> >> obscure key words, I don't want my stream shutting down if it is >>>>just >>>> quiet >>>> >> for a few time periods. >>>> >> >>>> >> 2.) StreamsProviderTasks assumes that a single read*, will return >>>>all >>>> the >>>> >> data for that request. This means that if I do a readRange for a >>>>year, >>>> the >>>> >> provider has to hold all of that data in memory and return it as >>>>one >>>> >> StreamsResultSet. I believe the readPerpetual was designed to get >>>> around >>>> >> this problem. >>>> >> >>>> >> Proposed Fixes/Changes: >>>> >> >>>> >> Fix 1.) Remove the StreamsResultSet. No implementations in the >>>>project >>>> >> currently use it for anything other than a wrapper around a Queue >>>>that >>>> is >>>> >> then iterated over. StreamsProvider will now return a >>>> Queue<StreamsDatum> >>>> >> instead of a StreamsResultSet. This will allow providers to queue >>>>data >>>> as >>>> >> they receive it, and the StreamsProviderTask can pop them off as >>>>soon as >>>> >> they are available. It will help fix problem #2, as well as help >>>>to >>>> lower >>>> >> memory usage. >>>> >> >>>> > >>>> > I'm not convinced this is a good idea. StreamsResultSet is a useful >>>> > abstraction even if no modules are using it as more than a wrapper >>>>for >>>> > Queue at the moment. For example read* in a provider or >>>>persistReader >>>> > could return batch-level (as opposed to datum-level) metadata from >>>>the >>>> > underlying API which would be useful state for the provider. >>>> > Switching to Queue would eliminate our ability to add those >>>> > capabilities at the core level or at the module level. >>>> > >>>> >> Fix 2.) Add a method, public boolean isRunning(), to the >>>>StreamsProvider >>>> >> interface. The StreamsProviderTask can call this function to see >>>>if the >>>> >> provider is still operating. This will help fix problems #1 and #2. >>>>This >>>> >> will allow the provider to run mulitthreaded, queue data as it's >>>> available, >>>> >> and notify the task when it's done so that it can be closed down >>>> properly. >>>> >> It will also allow the stream to be run in perpetuity as the >>>>StreamTask >>>> >> won't shut down providers that have not been producing data for a >>>>while. >>>> >> >>>> > >>>> > I think this is a good idea. +1 >>>> > >>>> >> Right now the StreamsProvider and StreamsProviderTask seem to be >>>>full of >>>> >> short term fixes that need to be redesigned into long term >>>>solutions. >>>> With >>>> >> enough positive feedback, I will create Jira tasks, a feature >>>>branch, >>>> and >>>> >> begin work. >>>> >> >>>> >> Sincerely, >>>> >> Ryan Ebanks >>>> >