[
https://issues.apache.org/jira/browse/CALCITE-849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14999798#comment-14999798
]
Jesse Yates commented on CALCITE-849:
-------------------------------------
My proposal was around trying to get something working sooner rather than
rewriting a large part of calcite (later) :). Maybe best to implement the
simpler solution for the moment and then we follow on a jira to do it 'right'?
Its not terribly invasive, so doing ripping it out later isn't that hard.
bq. each consumer would have to be aware that sometimes 'end of data' really
means 'end of data (for now)'.
Which really isn't the general case for streams; maybe you have an end of
stream marker or a set time bound, but streams are meant to be unbounded. What
was key for me was the ability to cancel my request. The stream may be
unbounded, but I know which request doesn't care any more, so I can understand
when "done" is correct.
bq. what I call a scheduler.
Yes.
bq. Enumerable ...uses pull mode
Yeah, that is still a problem from the implementation if you wanted to have
convenient cancelability. I was thinking about getting around this by having
the 'TimedPolicy' that leverages a shared Timer to schedule stop checks, but
again you are getting back to having a thread pool one way or another (in
short, bleh).
bq. Quite often streams need push mode – for example when splitting a stream
to two consumers based on a boolean condition, or when the producer is very
bursty and we don't want to tie up a thread
Yeah, but that data has to live somewhere while it is being processed, right?
Either in the calcite queue or in the pushing queue from the stream. I assume
the stream buffer >> calcite client buffer, making that the place you want to
push back on, rather than letting the stream pump data to calcite to do the
final processing. If the stream is just waiting on sending the data its not
going to be tying up CPU - its sleeping.
It just feels like a pull based mechanism for the calcite stream client is
safer.
bq. My proposed API isn't push or pull, but puts control into the hands of a
scheduler, which can then simulate either push or pull.... "work" method that
does some work and returns when the input is empty, the output is full, or some
quantum is exceeded:
Well, it lets the input push as much as it wants and then collaboratively
pushes to the output. There is no waiting on either side, so its not ever
really pull. I understand its an example, but it starts to get more tricky if
you are concerned with memory management on the input.
Instead, following the same idea, it could really be helpful to tie in some
[non-blocking backpressure|http://www.reactive-streams.org/]. Now your buffer
is 'collaboratiely' managed, so you only load the rows you want when you need
them. Essentially, your interfaces look like:
{code}
//AKA Input
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
// AKA Output
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
// Pushback mechanism to not overwhelm the subscriber before it is ready or
cancel any more requests
public interface Subscription {
public void request(long n);
public void cancel();
}
{code}
Its nicely async so you can schedule threads opportunistically. This is also
very amenable to the layering approach Calcite uses now; for example, a
producer can be wrapped in a filtering producer. It also means the stream isn't
going to overwhelm the calcite client with data.
bq. I believe that a stream should act like a "topic" in JMS, so every
consumer sees every row
That gets really really tricky, depending on what you are going for. If you
want to use the stream to pull information off the stream to see what is
happening 'right now' (e.g. a realtime dashboard), then replaying the entire
stream is exactly _not_ what you want. However, if you are looking to analyze
some large stream of data and get a general result, then when you stop? You
then need to specify a windowing of some sort. It could be that you don't look
at the whole stream by providing a window start of 'now', but that should be
pretty easy - my inclination is most ppl aren't often going to be replaying the
whole stream.
In the union case, you would still see every row as it comes in but could live
as two queries on the stream.
> Streams/Slow iterators dont close on statement close
> ----------------------------------------------------
>
> Key: CALCITE-849
> URL: https://issues.apache.org/jira/browse/CALCITE-849
> Project: Calcite
> Issue Type: Bug
> Reporter: Jesse Yates
> Assignee: Julian Hyde
> Fix For: 1.5.0
>
> Attachments: calcite-849-bug.patch
>
>
> This is easily seen when querying an infinite stream with a clause that
> cannot be matched
> {code}
> select stream PRODUCT from orders where PRODUCT LIKE 'noMatch';
> select stream * from orders where PRODUCT LIKE 'noMatch';
> {code}
> The issue arises when accessing the results in a multi-threaded context. Yes,
> its not a good idea (and things will break, like here). However, this case
> feels like it ought to be an exception.
> Suppose you are accessing a stream and have a query that doesn't match
> anything on the stream for a long time. Because of the way a ResultSet is
> built, the call to executeQuery() will hang until the first matching result
> is received. In that case, you might want to cancel the query because its
> taking so long. You also want the thing that's accessing the stream (the
> StreamTable implementation) to cancel the querying/collection - via a call to
> close on the passed iterator/enumerable.
> Since the first result was never generated, the ResultSet was never returned
> to the caller. You can get around this by using a second thread and keeping a
> handle to the creating statement. When you go to close that statement though,
> you end up not closing the cursor (and the underlying iterables/enumberables)
> because it never finished getting created.
> It gets even more problematic if you are use select * as the iterable doesn't
> finish getting created in the AvaticaResultSet.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)