What I meant to say re #1 below is that batch-level metadata could be useful for modules downstream of the StreamsProvider / StreamsPersistReader, and the StreamsResultSet gives us a class to which we can add new metadata in core as the project evolves, or supplement on a per-module or per-implementation basis via subclassing. Within a provider there's no need to modify or extend StreamsResultSet to maintain and utilize state from a third-party API.
I think I would support making StreamsResultSet an interface rather than a class. Steve Blackmon sblack...@apache.org On Mon, May 5, 2014 at 12:07 PM, Steve Blackmon <st...@blackmon.org> wrote: > Comments on this in-line below. > > On Thu, May 1, 2014 at 4:38 PM, Ryan Ebanks <ryaneba...@gmail.com> wrote: >> The use and implementations of the StreamsProviders seems to have drifted >> away from what it was originally designed for. I recommend that we change >> the StreamsProvider interface and StreamsProvider task to reflect the >> current usage patterns and to be more efficient. >> >> Current Problems: >> >> 1.) newPerpetualStream in LocalStream builder is not perpetual. The >> StreamProvider task will shut down after a certain amount of empty returns >> from the provider. A perpetual stream implies that it will run in >> perpetuity. If I open a Twitter Gardenhose that is returning tweets with >> obscure key words, I don't want my stream shutting down if it is just quiet >> for a few time periods. >> >> 2.) StreamsProviderTasks assumes that a single read*, will return all the >> data for that request. This means that if I do a readRange for a year, the >> provider has to hold all of that data in memory and return it as one >> StreamsResultSet. I believe the readPerpetual was designed to get around >> this problem. >> >> Proposed Fixes/Changes: >> >> Fix 1.) Remove the StreamsResultSet. No implementations in the project >> currently use it for anything other than a wrapper around a Queue that is >> then iterated over. StreamsProvider will now return a Queue<StreamsDatum> >> instead of a StreamsResultSet. This will allow providers to queue data as >> they receive it, and the StreamsProviderTask can pop them off as soon as >> they are available. It will help fix problem #2, as well as help to lower >> memory usage. >> > > I'm not convinced this is a good idea. StreamsResultSet is a useful > abstraction even if no modules are using it as more than a wrapper for > Queue at the moment. For example read* in a provider or persistReader > could return batch-level (as opposed to datum-level) metadata from the > underlying API which would be useful state for the provider. > Switching to Queue would eliminate our ability to add those > capabilities at the core level or at the module level. > >> Fix 2.) Add a method, public boolean isRunning(), to the StreamsProvider >> interface. The StreamsProviderTask can call this function to see if the >> provider is still operating. This will help fix problems #1 and #2. This >> will allow the provider to run mulitthreaded, queue data as it's available, >> and notify the task when it's done so that it can be closed down properly. >> It will also allow the stream to be run in perpetuity as the StreamTask >> won't shut down providers that have not been producing data for a while. >> > > I think this is a good idea. +1 > >> Right now the StreamsProvider and StreamsProviderTask seem to be full of >> short term fixes that need to be redesigned into long term solutions. With >> enough positive feedback, I will create Jira tasks, a feature branch, and >> begin work. >> >> Sincerely, >> Ryan Ebanks