[
https://issues.apache.org/jira/browse/CALCITE-849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15011584#comment-15011584
]
Julian Hyde commented on CALCITE-849:
-------------------------------------
bq. So then how do we interrupt the filter iteration? Or is that's source
enumerable also managed via push-based scheduling?
Right. If the source is an enumerable then a thread will need to pull on that
enumerable to get more rows, then put the rows into a queue. That thread may
spend most of its time blocked, waiting for more rows, so it should not be
regarded as part of the thread pool.
In general, the input to a filter will be another scheduled node (or to be
precise a buffer that is one of the outputs of a node). The filter's run method
will usually get a few rows of input, apply an expression to each of them, and
write a few rows of output. This will happen very quickly, so we will not want
to interrupt it. The only exception is if the expression calls a UDF that can
take an unbounded amount of time. That should be dealt with as a special case,
in my opinion. Cancels require locks or other synchronization, and I love the
idea that a typical filter can run lock-free.
bq. So the client then submits a request in the form of a sink to the scheduler
and the scheduler attempts to fulfill it when it gets a chance by making a
request to the source, which pushes its state up to the scheduler when it has a
row ready?
Yes.
bq. Then, if the scheduler is just passing along the response to that request,
how do you manage further down in the source the "1 second's worth of data"
type of logic without a poll + timeout?
Timeouts, locking, and batching only need to occur on the edges of the graph.
In my scheme, inside the graph we wouldn't use Enumerator at all -- because
reading rows is blocking it removes flexibility from scheduling; and because it
is a row-at-a-time we don't get good locality.
On input to the graph, if we are reading from a pull-based source (say an
Enumerator) we would have a thread that blocks, reads a row, places it in a
buffer, and sends the buffer when N rows or T seconds have elapsed. If T
seconds expires during a blocking call to Enumerator.next the scheduler would
steal the partially full buffer.
If we need push-based input we do something similar, stealing a partially full
buffer every T seconds.
On output from the graph, if we need pull-based output we can easily build an
Enumerator or ResultSet on top of BlockingQueue.poll. (Interpreter.enumerator
does something similar but since it is based on ArrayQueue not BlockingQueue it
cannot timeout.)
If we need push-based output we use your Subscription mechanism.
bq. Am I getting anywhere close to what you are thinking? Sorry to run around
on this, just want to avoid a writing a bunch of code that is completely the
wrong direction from what you are thinking.
I appreciate your patience - we are starting from different (well reasoned)
places and it takes time and effort from both sides to converge.
The key point in my philosophy - which I hope is coming through - is that
stream processing is symmetric in terms of produce/consume, push/pull, and push
and pull are BOTH too limiting inside the engine. Hence I want to keep
Enumerator out of the engine. At the edges we can afford locking, timeouts,
spare threads and we can easily convert to/from Enumerators. But inside the
engine there is only the humble cooperative "run" method that yields every few
(thousand) rows.
> Streams/Slow iterators dont close on statement close
> ----------------------------------------------------
>
> Key: CALCITE-849
> URL: https://issues.apache.org/jira/browse/CALCITE-849
> Project: Calcite
> Issue Type: Bug
> Reporter: Jesse Yates
> Assignee: Julian Hyde
> Fix For: 1.5.0
>
> Attachments: calcite-849-bug.patch
>
>
> This is easily seen when querying an infinite stream with a clause that
> cannot be matched
> {code}
> select stream PRODUCT from orders where PRODUCT LIKE 'noMatch';
> select stream * from orders where PRODUCT LIKE 'noMatch';
> {code}
> The issue arises when accessing the results in a multi-threaded context. Yes,
> its not a good idea (and things will break, like here). However, this case
> feels like it ought to be an exception.
> Suppose you are accessing a stream and have a query that doesn't match
> anything on the stream for a long time. Because of the way a ResultSet is
> built, the call to executeQuery() will hang until the first matching result
> is received. In that case, you might want to cancel the query because its
> taking so long. You also want the thing that's accessing the stream (the
> StreamTable implementation) to cancel the querying/collection - via a call to
> close on the passed iterator/enumerable.
> Since the first result was never generated, the ResultSet was never returned
> to the caller. You can get around this by using a second thread and keeping a
> handle to the creating statement. When you go to close that statement though,
> you end up not closing the cursor (and the underlying iterables/enumberables)
> because it never finished getting created.
> It gets even more problematic if you are use select * as the iterable doesn't
> finish getting created in the AvaticaResultSet.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)