What I meant to say re #1 below is that batch-level metadata could be
useful for modules downstream of the StreamsProvider /
StreamsPersistReader, and the StreamsResultSet gives us a class to
which we can add new metadata in core as the project evolves, or
supplement on a per-module or per-implementation basis via
subclassing.  Within a provider there's no need to modify or extend
StreamsResultSet to maintain and utilize state from a third-party API.

I think I would support making StreamsResultSet an interface rather
than a class.

Steve Blackmon
sblack...@apache.org

On Mon, May 5, 2014 at 12:07 PM, Steve Blackmon <st...@blackmon.org> wrote:
> Comments on this in-line below.
>
> On Thu, May 1, 2014 at 4:38 PM, Ryan Ebanks <ryaneba...@gmail.com> wrote:
>> The use and implementations of the StreamsProviders seems to have drifted
>> away from what it was originally designed for.  I recommend that we change
>> the StreamsProvider interface and StreamsProvider task to reflect the
>> current usage patterns and to be more efficient.
>>
>> Current Problems:
>>
>> 1.) newPerpetualStream in LocalStream builder is not perpetual.  The
>> StreamProvider task will shut down after a certain amount of empty returns
>> from the provider.  A perpetual stream implies that it will run in
>> perpetuity.  If I open a Twitter Gardenhose that is returning tweets with
>> obscure key words, I don't want my stream shutting down if it is just quiet
>> for a few time periods.
>>
>> 2.) StreamsProviderTasks assumes that a single read*, will return all the
>> data for that request.  This means that if I do a readRange for a year, the
>> provider has to hold all of that data in memory and return it as one
>> StreamsResultSet.  I believe the readPerpetual was designed to get around
>> this problem.
>>
>> Proposed Fixes/Changes:
>>
>> Fix 1.) Remove the StreamsResultSet.  No implementations in the project
>> currently use it for anything other than a wrapper around a Queue that is
>> then iterated over.  StreamsProvider will now return a Queue<StreamsDatum>
>> instead of a StreamsResultSet.  This will allow providers to queue data as
>> they receive it, and the StreamsProviderTask can pop them off as soon as
>> they are available.  It will help fix problem #2, as well as help to lower
>> memory usage.
>>
>
> I'm not convinced this is a good idea.  StreamsResultSet is a useful
> abstraction even if no modules are using it as more than a wrapper for
> Queue at the moment.  For example read* in a provider or persistReader
> could return batch-level (as opposed to datum-level) metadata from the
> underlying API which would be useful state for the provider.
> Switching to Queue would eliminate our ability to add those
> capabilities at the core level or at the module level.
>
>> Fix 2.) Add a method, public boolean isRunning(), to the StreamsProvider
>> interface.  The StreamsProviderTask can call this function to see if the
>> provider is still operating. This will help fix problems #1 and #2. This
>> will allow the provider to run mulitthreaded, queue data as it's available,
>> and notify the task when it's done so that it can be closed down properly.
>>  It will also allow the stream to be run in perpetuity as the StreamTask
>> won't shut down providers that have not been producing data for a while.
>>
>
> I think this is a good idea.  +1
>
>> Right now the StreamsProvider and StreamsProviderTask seem to be full of
>> short term fixes that need to be redesigned into long term solutions.  With
>> enough positive feedback, I will create Jira tasks, a feature branch, and
>> begin work.
>>
>> Sincerely,
>> Ryan Ebanks

Reply via email to