Enumerable is a record at a time therefore inherently slow. So, Sink is a step in the right direction to improving this, because it collects batches of records.
The next step is to improve the scheduling, so a node is invoked to process a number of records - say for X nanoseconds or until input is empty or until output has at least N rows or B bytes - rather than at present where it processes all input at the same time. Finding the right balance between one-at-a-time and all-rows-at-once is the ideal compromise of efficiency and memory usage. As I said, the Arrow adapter will take a further big step in that direction. > On May 30, 2018, at 1:08 PM, Muhammad Gelbana <[email protected]> wrote: > > Is it correct to think that the Interpreter can be improved by removing the > "sink" mechanism since buffering is only reasonable within the receiving > node context, not the sending one ? In other words, an node acting as an > input to another one should always send an enumerable and the > receiving\parent node is free to buffer the obtained rows or process them > right away. > > Thanks, > Gelbana > > On Wed, May 30, 2018 at 12:37 AM, Julian Hyde <[email protected]> wrote: > >> Yes, when you use a Sink there is an assumption that there is a Node >> running that is consuming from the deque. Currently the Interpreter only >> runs one Node at a time, which means that the full output of that Node sits >> in a deque for a while. >> >> Clearly the Interpreter has much room for improvement. >> >>> On May 29, 2018, at 3:22 PM, Muhammad Gelbana <[email protected]> >> wrote: >>> >>> I found out what was consuming the memory and delaying the results at the >>> same time. I was pushing all obtained rows from the datasource into a >> sink >>> creating by this method >>> <https://github.com/apache/calcite/blob/27a190ff303700b4329384e05c39bc >> 40c893048e/core/src/main/java/org/apache/calcite/ >> interpreter/Compiler.java#L50>. >>> Pushing rows into the sink halts further nodes execution until all rows >> are >>> totally loaded. I thought since the sink is backed by an "ArrayDeque" >> that >>> the rows would be consumed while being pushed to the sink. >>> >>> The other approach I applied was to use the "enumerable" method instead. >>> This way, returned rows from my nodes are available for successive nodes >>> without delay. >>> >>> Thank you all and thank you Julian for the Arrow adapter code. >>> >>> Thanks, >>> Gelbana >>> >>> On Tue, May 29, 2018 at 5:50 PM, Julian Hyde <[email protected]> wrote: >>> >>>> I believe that scan, filter, project do not buffer; aggregate, join and >>>> sort do buffer; join perhaps buffers a little more than it should. >>>> >>>> Read methods in EnumerableDefaults, for example EnumerableDefaults.join, >>>> to see where a blocking collection is created and from which input. >>>> >>>> Ideally the operators would exploit sorted input (e.g. we could have an >>>> aggregate that assumes input is sorted by the GROUP BY key and only >> buffers >>>> records that have the same key) but Enumerable does not aim to be a >>>> high-performance, scalable engine, so this never got prioritized. >>>> >>>> On a related note, I was pleased to see progress on an Arrow adapter and >>>> convention in https://issues.apache.org/jira/browse/CALCITE-2173 < >>>> https://issues.apache.org/jira/browse/CALCITE-2173>. If we were to >> write >>>> a high-performance engine that scales across many threads, it would be >>>> based on Arrow. So anyone with complaints about the performance of >>>> Enumerable convention should start contributing to Arrow convention! >>>> >>>> Julian >>>> >>>> >>>>> On May 29, 2018, at 7:20 AM, Michael Mior <[email protected]> wrote: >>>>> >>>>> In theory it certainly should be possible to stream the results. This >>>> isn't >>>>> guaranteed however. You would have to look at the entire query pipeline >>>> to >>>>> see where things are being materialized. A full stack trace without >>>>> elements removed would be a good start. >>>>> >>>>> -- >>>>> Michael Mior >>>>> [email protected] >>>>> >>>>> >>>>> >>>>> Le lun. 28 mai 2018 à 19:05, Muhammad Gelbana <[email protected]> a >>>>> écrit : >>>>> >>>>>> I'm not sure if I phrased my question correctly so let me explain >> more. >>>>>> >>>>>> I'm running a (SELECT * FROM TABLE) query against a 50 million records >>>>>> table (Following the BINDABLE convention, so it sends it's rows >> through >>>> a >>>>>> "sink"). Since the extracted rows aren't processed in any way, I was >>>>>> expecting that the output JDBC resultset would be able to enumerate >>>> through >>>>>> all the results in a matter of seconds, but instead, my machine didn't >>>>>> print anything. What exactly happens is that >>>>>> (PreparedStatement.executeQuery) doesn't return a resultset promptly >>>> even >>>>>> after a few minutes have passed. >>>>>> >>>>>> I tried a table with hundreds of rows and my testing code printed >> those >>>>>> results right away so it's not something I missed there, but probably >> a >>>>>> configuration I didn't set ? Or may be that's just how it is ? Does >>>> anyone >>>>>> else believe that the behaviour I expected is reasonable ? It would >> also >>>>>> lower the amount of memory consumed to hold the complete results >> before >>>>>> bursting them to their final destination, if that's the case in the >>>> first >>>>>> place. >>>>>> >>>>>> >>>>>> Thanks, >>>>>> Gelbana >>>>>> >>>> >>>> >> >>
