[ 
https://issues.apache.org/jira/browse/CALCITE-849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14999798#comment-14999798
 ] 

Jesse Yates commented on CALCITE-849:
-------------------------------------

My proposal was around trying to get something working sooner rather than 
rewriting a large part of calcite (later) :). Maybe best to implement the 
simpler solution for the moment and then we follow on a jira to do it 'right'? 
Its not terribly invasive, so doing ripping it out later isn't that hard.

 bq. each consumer would have to be aware that sometimes 'end of data' really 
means 'end of data (for now)'.

Which really isn't the general case for streams; maybe you have an end of 
stream marker or a set time bound, but streams are meant to be unbounded. What 
was key for me was the ability to cancel my request. The stream may be 
unbounded, but I know which request doesn't care any more, so I can understand 
when "done" is correct.

bq. what I call a scheduler.

Yes. 

bq. Enumerable ...uses pull mode

Yeah, that is still a problem from the implementation if you wanted to have 
convenient cancelability. I was thinking about getting around this by having 
the 'TimedPolicy' that leverages a shared Timer to schedule stop checks, but 
again you are getting back to having a thread pool one way or another (in 
short, bleh).

bq.  Quite often streams need push mode – for example when splitting a stream 
to two consumers based on a boolean condition, or when the producer is very 
bursty and we don't want to tie up a thread

Yeah, but that data has to live somewhere while it is being processed, right? 
Either in the calcite queue or in the pushing queue from the stream. I assume 
the stream buffer >> calcite client buffer, making that the place you want to 
push back on, rather than letting the stream pump data to calcite to do the 
final processing. If the stream is just waiting on sending the data its not 
going to be tying up CPU - its sleeping.

It just feels like a pull based mechanism for the calcite stream client is 
safer.

bq. My proposed API isn't push or pull, but puts control into the hands of a 
scheduler, which can then simulate either push or pull.... "work" method that 
does some work and returns when the input is empty, the output is full, or some 
quantum is exceeded:

Well, it lets the input push as much as it wants and then collaboratively 
pushes to the output. There is no waiting on either side, so its not ever 
really pull. I understand its an example, but it starts to get more tricky if 
you are concerned with memory management on the input. 

Instead, following the same idea, it could really be helpful to tie in some 
[non-blocking backpressure|http://www.reactive-streams.org/]. Now your buffer 
is 'collaboratiely' managed, so you only load the rows you want when you need 
them. Essentially, your interfaces look like:

{code}
//AKA Input
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

// AKA Output
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

// Pushback mechanism to not overwhelm the subscriber before it is ready or 
cancel any more requests
public interface Subscription {
    public void request(long n);
    public void cancel();
}
{code}

Its nicely async so you can schedule threads opportunistically. This is also 
very amenable to the layering approach Calcite uses now; for example, a 
producer can be wrapped in a filtering producer. It also means the stream isn't 
going to overwhelm the calcite client with data.

bq.  I believe that a stream should act like a "topic" in JMS, so every 
consumer sees every row

That gets really really tricky, depending on what you are going for. If you 
want to use the stream to pull information off the stream to see what is 
happening 'right now' (e.g. a realtime dashboard), then replaying the entire 
stream is exactly _not_ what you want. However, if you are looking to analyze 
some large stream of data and get a general result, then when you stop? You 
then need to specify a windowing of some sort. It could be that you don't look 
at the whole stream by providing a window start of 'now', but that should be 
pretty easy - my inclination is most ppl aren't often going to be replaying the 
whole stream.

In the union case, you would still see every row as it comes in but could live 
as two queries on the stream.

> Streams/Slow iterators dont close on statement close
> ----------------------------------------------------
>
>                 Key: CALCITE-849
>                 URL: https://issues.apache.org/jira/browse/CALCITE-849
>             Project: Calcite
>          Issue Type: Bug
>            Reporter: Jesse Yates
>            Assignee: Julian Hyde
>             Fix For: 1.5.0
>
>         Attachments: calcite-849-bug.patch
>
>
> This is easily seen when querying an infinite stream with a clause that 
> cannot be matched
> {code}
> select stream PRODUCT from orders where PRODUCT LIKE 'noMatch';
> select stream * from orders where PRODUCT LIKE 'noMatch';
> {code}
> The issue arises when accessing the results in a multi-threaded context. Yes, 
> its not a good idea (and things will break, like here). However, this case 
> feels like it ought to be an exception.
> Suppose you are accessing a stream and have a query that doesn't match 
> anything on the stream for a long time. Because of the way a ResultSet is 
> built, the call to executeQuery() will hang until the first matching result 
> is received. In that case, you might want to cancel the query because its 
> taking so long. You also want the thing that's accessing the stream (the 
> StreamTable implementation) to cancel the querying/collection - via a call to 
> close on the passed iterator/enumerable.
> Since the first result was never generated, the ResultSet was never returned 
> to the caller. You can get around this by using a second thread and keeping a 
> handle to the creating statement. When you go to close that statement though, 
> you end up not closing the cursor (and the underlying iterables/enumberables) 
> because it never finished getting created.
> It gets even more problematic if you are use select * as the iterable doesn't 
> finish getting created in the AvaticaResultSet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to