Row-count is not the only statistic you can use in computing your cost function. You can use many statistics in your cost function (e.g. selectivity, column cardinality, predicates).
So, if you decide to use rows-per-minute as your cost function, you can compute rows-per-minute of the join based on a combination of rows-per-minute and row-count of the input streams. Julian > On Jul 15, 2019, at 11:57 AM, Alireza Samadian <[email protected]> > wrote: > > Hi Julian, > > Thank you for you reply. > > I think the problem with interpreting rowCount as the rate is we need both > rate and window size of the inputs for the estimation of output rate of the > RelNodes, and this cannot be embedded into a single number. > > As an example, let A and B be two windowed streams with rates r(A) and r(B) > that are joined (for simplicity let's assume its a cross join) and we want > to estimate the output rate of the join. The output rate cannot be > estimated as r(A)r(B). That is because even if the rates are small, having > a large window size causes us to produce many tuples per unit of time in > the output. Or take an example of aggregation RelNode. The output rate of > aggregation depends on both rate and window size of its input because for > every window it is emitting one tuple. > > For combination of bounded sources and unbounded sources in many cases the > number of times that a table is scanned is once in the total execution. For > instance, let U1 be an unbounded source, and B1 and B2 be bounded sources. > Then in Join(U1, Join(B1,B2)) both B1 and B2 are scanned only once, which > means the suggested rows per second for them is zero. This can > potentially makes problem if a part of the query plan is bounded sources > joining together (the row count for all of them will be zero). Furthermore, > for bounded sources, we can consider rate=0 and the row count can be > interpreted as a window size (in terms of tuples) and that can be used when > we are trying to estimate the output rate of Join(U1,B1); whereas, I am not > sure how considering rows per second for bounded sources can help us > estimating the output rate. > > Best, > Alireza > > On Fri, Jul 12, 2019 at 5:43 PM Julian Hyde <[email protected]> wrote: > >> In practice, the rowCount is just a number. So you can think of it as >> rows-per-second if you are optimizing a continuous query. >> >> If you are using a table in a streaming query, does it have a “rows per >> second?”. Yes - it is the number of rows in the table multiplied by the >> number of times per second that the table is scanned. >> >> I don’t know of any good academic work on optimizing continuous queries. >> (There is work on self-tuning queries, as indeed there is plenty of work on >> self-tuning sort & join algorithms for traditional DB, but nothing I know >> that bridges streaming query to classical cost-based query optimization.) I >> will remark that streaming queries have one big disadvantage one big >> advantage. The disadvantage is that there are no stats available at the >> start, so you are not able to do cost-based optimization. The advantage is >> that the query runs forever, so you can start off with a bad plan, gather >> stats, and periodically re-optimize. The optimal plan might change over >> time as the streams change volume. >> >> Julian >> >> >>> On Jul 10, 2019, at 4:23 PM, Stamatis Zampetakis <[email protected]> >> wrote: >>> >>> Looking forward for the outcome :) >>> >>> Below a few comments regarding the extensibility concerns of Kenn. >>> >>> In order to find the best plan the VolcanoPlanner just needs to know if >> one >>> cost is less than another cost [1] and this is encapsulated in the >>> isLe/isLt methods [2]. >>> Adding a new cost class (other than VolcanoCost) which implements the >>> RelOptCost interface and respect its contract should work. >>> The VolcanoPlanner can be instantiated with a custom RelOptCostFactory >> [3] >>> which returns any kind of RelOptCost object. >>> >>> It is true that there is a strong link with CPU, I/O, and cardinality >>> metrics but it is hard to imagine an optimizer that does not take these >>> into consideration. >>> At the moment the cost comparison in Volcano is rather simple since it >> uses >>> only the cardinality estimations [4] so I guess we could improve on this. >>> However recent studies [5] have shown that cardinality estimations matter >>> much more than other metrics (such as I/O and CPU) so in the end it may >> not >>> worth the effort. >>> >>> Best, >>> Stamatis >>> >>> [1] >>> >> https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/RelSubset.java#L348 >>> [2] >>> >> https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/RelOptCost.java#L83 >>> [3] >>> >> https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java#L263 >>> [4] >>> >> https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoCost.java#L113 >>> [5] Viktor Leis, Andrey Gubichev, Atanas Mirchev, Peter A. Boncz, Alfons >>> Kemper, Thomas Neumann: How Good Are Query Optimizers, Really? PVLDB >> 9(3): >>> 204-215 (2015) (http://www.vldb.org/pvldb/vol9/p204-leis.pdf) >>> >>> On Wed, Jul 10, 2019 at 11:45 PM Alireza Samadian >>> <[email protected]> wrote: >>> >>>> Dear Stamatis, >>>> Thank you for your reply. I will probably go with overriding >>>> computeSelfCost() as the first step. I checked it, and it seems to be >>>> working. >>>> >>>> Dear Kenn, >>>> The cited paper estimates those two values for each node and passes it >> up >>>> but they are not the cost. The cost of a node depends on the operation >> we >>>> are performing on the input and the rate of the input (input to that >>>> relational node). So for all of the nodes the cost is modeled as c*rate >>>> where c is the number of operations per tuple and rate is the rate of >> the >>>> input. It might be possible to have some other factors in the >> calculation >>>> of the cost for each node. So at the end there will be always a single >>>> scalar as the cost of a node. This single scalar can be a function of >>>> number of rows accessed, number of I/O access, and etc.. In calcite it >> is >>>> assumed any value that we are getting as a cost is going to be function >> of >>>> (row_count, CPU, I/O). Note that in the streaming model there is no need >>>> for window to be in the cost (the cost does not depend on it), I am >>>> including it in the cost model only because estimating the output rate >> of >>>> the nodes depends on it, and I don't know any other way to get it from >> the >>>> inputs of the RelNodes. >>>> >>>> Best, >>>> Alireza >>>> >>>> >>>> >>>> >>>> On Wed, Jul 10, 2019 at 12:40 PM Kenneth Knowles <[email protected]> >> wrote: >>>> >>>>> Following this discussion, I have a question which I think is on topic. >>>>> Seems like there's two places that from my brief reading are not quite >>>>> extensible enough. >>>>> >>>>> 1. RelNode.computeSelfCost returns RelOptCost has particular measures >>>> built >>>>> in. Would Alireza's proposal require extensibility here to add/remove >>>>> measures? The planner seems to depend on them being CPU, IO, rows. >>>>> 2. But the VolcanoPlanner also just adds the above together to a single >>>>> scalar. Does the cited paper avoid this practice and instead retain >> both >>>>> measures? >>>>> >>>>> Again, I'm just jumping around in the code to educate myself. >>>>> >>>>> Kenn >>>>> >>>>> On Mon, Jul 8, 2019 at 4:02 PM Stamatis Zampetakis <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Alireza, >>>>>> >>>>>> Cost models for streams is a very cool topic but I don't have much >>>>>> knowledge in the domain. >>>>>> >>>>>> Regarding the implementation details if you have custom physical >>>>> operators >>>>>> then it makes sense to implement computeSelfCost() function as you see >>>>> fit. >>>>>> >>>>>> Another option is to plug in your custom RelMetadataProvider [1]; you >>>> can >>>>>> find a few examples in RelMetadataTest [2]. >>>>>> That way you can also change the cost function of existing operators >>>>>> (logical or not) without changing the operators themselves. >>>>>> >>>>>> As far as it concerns the cost of logical operators the behavior of >> the >>>>>> planner can be customized [3]. >>>>>> The most common configuration is to ignore the cost of logical >>>> operators >>>>> so >>>>>> leaving it as infinite. >>>>>> >>>>>> Best, >>>>>> Stamatis >>>>>> >>>>>> [1] >>>>>> >>>>>> >>>>> >>>> >> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/metadata/RelMetadataProvider.java >>>>>> [2] >>>>>> >>>>>> >>>>> >>>> >> https://github.com/apache/calcite/blob/e8d598a434e8dbadaf756f8c57c748f4d7e16fdf/core/src/test/java/org/apache/calcite/test/RelMetadataTest.java#L1005 >>>>>> >>>>>> On Tue, Jul 2, 2019 at 11:23 PM Alireza Samadian >>>>>> <[email protected]> wrote: >>>>>> >>>>>>> Dear Members of Calcite Community, >>>>>>> >>>>>>> I'm working on Apache Beam SQL and we use Calcite for query >>>>> optimization. >>>>>>> We represent both tables and streams as a subclass of >>>>>>> AbstractQueryableTable. In calcite implementation of cost model and >>>>>>> statistics, one of the key elements is row count. Also all the nodes >>>>> can >>>>>>> present a rowcount estimation based on their inputs. For instance, in >>>>> the >>>>>>> case of joins, the rowcount is estimated by: >>>>>>> left.rowCount*right.rowCount*selectivity_estimate. >>>>>>> >>>>>>> My first question is, what is the correct way of representing streams >>>>> in >>>>>>> Calcite's Optimizer? Does calcite still uses row_count for streams? >>>> If >>>>>> so, >>>>>>> what does row count represent in case of streams? >>>>>>> >>>>>>> In [1] they are suggesting to use both window (size of window in >>>> terms >>>>> of >>>>>>> tuples) and rate to represent output of all nodes in stream >>>> processing >>>>>>> systems, and for every node these two values are estimated. For >>>>> instance, >>>>>>> they suggest to estimate window and rate of the joins using: >>>>>>> join_rate = (left_rate*right_window + >>>>>> right_rate*left_window)*selectivitiy >>>>>>> join_window = (left_window*right_window)*selectivitiy >>>>>>> >>>>>>> We were thinking about using this approach for Beam SQL; however, I >>>> am >>>>>>> wondering where would be the point of extension? I was thinking to >>>>>>> implement computeSelfCost() using a different cost model (rate, >>>>>>> window_size) for our physical Rel Nodes, in which we don't call >>>>>>> estimate_row_count and instead we use inputs' non cumulative cost to >>>>>>> estimate the node's cost. However, I am not sure if this is a good >>>>>> approach >>>>>>> and whether this can potentially cause problems in the optimization >>>>>>> (because there will still be logical nodes that are implemented in >>>>>> calcite >>>>>>> and may use row count estimation). Does calcite uses cost estimation >>>>> for >>>>>>> logical nodes such as logical join? or it only calculates the cost >>>> when >>>>>> the >>>>>>> nodes are physical? >>>>>>> >>>>>>> I will appreciate if someone can help me. I will also appreciate if >>>>>> someone >>>>>>> has other suggestions for streams query optimization. >>>>>>> >>>>>>> Best, >>>>>>> Alireza Samadian >>>>>>> >>>>>>> [1] Ayad, Ahmed M., and Jeffrey F. Naughton. "Static optimization of >>>>>>> conjunctive queries with sliding windows over infinite streams." >>>>>>> *Proceedings >>>>>>> of the 2004 ACM SIGMOD international conference on Management of >>>> data*. >>>>>>> ACM, 2004. >>>>>>> >>>>>> >>>>> >>>> >> >>
