On Fri, Jul 12, 2019 at 5:43 PM Julian Hyde <[email protected]> wrote:
> In practice, the rowCount is just a number. So you can think of it as > rows-per-second if you are optimizing a continuous query. > > If you are using a table in a streaming query, does it have a “rows per > second?”. Yes - it is the number of rows in the table multiplied by the > number of times per second that the table is scanned. > > I don’t know of any good academic work on optimizing continuous queries. > (There is work on self-tuning queries, as indeed there is plenty of work on > self-tuning sort & join algorithms for traditional DB, but nothing I know > that bridges streaming query to classical cost-based query optimization.) I > will remark that streaming queries have one big disadvantage one big > advantage. The disadvantage is that there are no stats available at the > start, so you are not able to do cost-based optimization. The advantage is > that the query runs forever, so you can start off with a bad plan, gather > stats, and periodically re-optimize. The optimal plan might change over > time as the streams change volume. > Yea, this is a very tricky point. For Beam SQL backends like Flink and Dataflow, re-planning based on the gathered info or changes in the nature of the inputs would require data migrations beyond current capabilities. Do you know of a system that dynamically re-plans and migrates intermediate state? FWIW you might be able to have some stats at the start from a metadata store, but it is of course subject to change. Kenn > Julian > > > > On Jul 10, 2019, at 4:23 PM, Stamatis Zampetakis <[email protected]> > wrote: > > > > Looking forward for the outcome :) > > > > Below a few comments regarding the extensibility concerns of Kenn. > > > > In order to find the best plan the VolcanoPlanner just needs to know if > one > > cost is less than another cost [1] and this is encapsulated in the > > isLe/isLt methods [2]. > > Adding a new cost class (other than VolcanoCost) which implements the > > RelOptCost interface and respect its contract should work. > > The VolcanoPlanner can be instantiated with a custom RelOptCostFactory > [3] > > which returns any kind of RelOptCost object. > > > > It is true that there is a strong link with CPU, I/O, and cardinality > > metrics but it is hard to imagine an optimizer that does not take these > > into consideration. > > At the moment the cost comparison in Volcano is rather simple since it > uses > > only the cardinality estimations [4] so I guess we could improve on this. > > However recent studies [5] have shown that cardinality estimations matter > > much more than other metrics (such as I/O and CPU) so in the end it may > not > > worth the effort. > > > > Best, > > Stamatis > > > > [1] > > > https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/RelSubset.java#L348 > > [2] > > > https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/RelOptCost.java#L83 > > [3] > > > https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java#L263 > > [4] > > > https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoCost.java#L113 > > [5] Viktor Leis, Andrey Gubichev, Atanas Mirchev, Peter A. Boncz, Alfons > > Kemper, Thomas Neumann: How Good Are Query Optimizers, Really? PVLDB > 9(3): > > 204-215 (2015) (http://www.vldb.org/pvldb/vol9/p204-leis.pdf) > > > > On Wed, Jul 10, 2019 at 11:45 PM Alireza Samadian > > <[email protected]> wrote: > > > >> Dear Stamatis, > >> Thank you for your reply. I will probably go with overriding > >> computeSelfCost() as the first step. I checked it, and it seems to be > >> working. > >> > >> Dear Kenn, > >> The cited paper estimates those two values for each node and passes it > up > >> but they are not the cost. The cost of a node depends on the operation > we > >> are performing on the input and the rate of the input (input to that > >> relational node). So for all of the nodes the cost is modeled as c*rate > >> where c is the number of operations per tuple and rate is the rate of > the > >> input. It might be possible to have some other factors in the > calculation > >> of the cost for each node. So at the end there will be always a single > >> scalar as the cost of a node. This single scalar can be a function of > >> number of rows accessed, number of I/O access, and etc.. In calcite it > is > >> assumed any value that we are getting as a cost is going to be function > of > >> (row_count, CPU, I/O). Note that in the streaming model there is no need > >> for window to be in the cost (the cost does not depend on it), I am > >> including it in the cost model only because estimating the output rate > of > >> the nodes depends on it, and I don't know any other way to get it from > the > >> inputs of the RelNodes. > >> > >> Best, > >> Alireza > >> > >> > >> > >> > >> On Wed, Jul 10, 2019 at 12:40 PM Kenneth Knowles <[email protected]> > wrote: > >> > >>> Following this discussion, I have a question which I think is on topic. > >>> Seems like there's two places that from my brief reading are not quite > >>> extensible enough. > >>> > >>> 1. RelNode.computeSelfCost returns RelOptCost has particular measures > >> built > >>> in. Would Alireza's proposal require extensibility here to add/remove > >>> measures? The planner seems to depend on them being CPU, IO, rows. > >>> 2. But the VolcanoPlanner also just adds the above together to a single > >>> scalar. Does the cited paper avoid this practice and instead retain > both > >>> measures? > >>> > >>> Again, I'm just jumping around in the code to educate myself. > >>> > >>> Kenn > >>> > >>> On Mon, Jul 8, 2019 at 4:02 PM Stamatis Zampetakis <[email protected]> > >>> wrote: > >>> > >>>> Hi Alireza, > >>>> > >>>> Cost models for streams is a very cool topic but I don't have much > >>>> knowledge in the domain. > >>>> > >>>> Regarding the implementation details if you have custom physical > >>> operators > >>>> then it makes sense to implement computeSelfCost() function as you see > >>> fit. > >>>> > >>>> Another option is to plug in your custom RelMetadataProvider [1]; you > >> can > >>>> find a few examples in RelMetadataTest [2]. > >>>> That way you can also change the cost function of existing operators > >>>> (logical or not) without changing the operators themselves. > >>>> > >>>> As far as it concerns the cost of logical operators the behavior of > the > >>>> planner can be customized [3]. > >>>> The most common configuration is to ignore the cost of logical > >> operators > >>> so > >>>> leaving it as infinite. > >>>> > >>>> Best, > >>>> Stamatis > >>>> > >>>> [1] > >>>> > >>>> > >>> > >> > https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/metadata/RelMetadataProvider.java > >>>> [2] > >>>> > >>>> > >>> > >> > https://github.com/apache/calcite/blob/e8d598a434e8dbadaf756f8c57c748f4d7e16fdf/core/src/test/java/org/apache/calcite/test/RelMetadataTest.java#L1005 > >>>> > >>>> On Tue, Jul 2, 2019 at 11:23 PM Alireza Samadian > >>>> <[email protected]> wrote: > >>>> > >>>>> Dear Members of Calcite Community, > >>>>> > >>>>> I'm working on Apache Beam SQL and we use Calcite for query > >>> optimization. > >>>>> We represent both tables and streams as a subclass of > >>>>> AbstractQueryableTable. In calcite implementation of cost model and > >>>>> statistics, one of the key elements is row count. Also all the nodes > >>> can > >>>>> present a rowcount estimation based on their inputs. For instance, in > >>> the > >>>>> case of joins, the rowcount is estimated by: > >>>>> left.rowCount*right.rowCount*selectivity_estimate. > >>>>> > >>>>> My first question is, what is the correct way of representing streams > >>> in > >>>>> Calcite's Optimizer? Does calcite still uses row_count for streams? > >> If > >>>> so, > >>>>> what does row count represent in case of streams? > >>>>> > >>>>> In [1] they are suggesting to use both window (size of window in > >> terms > >>> of > >>>>> tuples) and rate to represent output of all nodes in stream > >> processing > >>>>> systems, and for every node these two values are estimated. For > >>> instance, > >>>>> they suggest to estimate window and rate of the joins using: > >>>>> join_rate = (left_rate*right_window + > >>>> right_rate*left_window)*selectivitiy > >>>>> join_window = (left_window*right_window)*selectivitiy > >>>>> > >>>>> We were thinking about using this approach for Beam SQL; however, I > >> am > >>>>> wondering where would be the point of extension? I was thinking to > >>>>> implement computeSelfCost() using a different cost model (rate, > >>>>> window_size) for our physical Rel Nodes, in which we don't call > >>>>> estimate_row_count and instead we use inputs' non cumulative cost to > >>>>> estimate the node's cost. However, I am not sure if this is a good > >>>> approach > >>>>> and whether this can potentially cause problems in the optimization > >>>>> (because there will still be logical nodes that are implemented in > >>>> calcite > >>>>> and may use row count estimation). Does calcite uses cost estimation > >>> for > >>>>> logical nodes such as logical join? or it only calculates the cost > >> when > >>>> the > >>>>> nodes are physical? > >>>>> > >>>>> I will appreciate if someone can help me. I will also appreciate if > >>>> someone > >>>>> has other suggestions for streams query optimization. > >>>>> > >>>>> Best, > >>>>> Alireza Samadian > >>>>> > >>>>> [1] Ayad, Ahmed M., and Jeffrey F. Naughton. "Static optimization of > >>>>> conjunctive queries with sliding windows over infinite streams." > >>>>> *Proceedings > >>>>> of the 2004 ACM SIGMOD international conference on Management of > >> data*. > >>>>> ACM, 2004. > >>>>> > >>>> > >>> > >> > >
