Possibly we could also rely on RelOptCostFactory#makeInfiniteCost and RelOptCost#isInfinite.
Best, Stamatis On Fri, Jan 3, 2020 at 8:04 PM Rui Wang <[email protected]> wrote: > +1 on having a type of property on Relnode to make know which node is steam > or non-stream. In Apache Beam SQL's practice, stream joins are already > metadata driven in which if there is one side stream and another side > non-stream, we use hash-join like implementation but build that table on > the non-stream side. > > > Technically, it's feasible to build hash table on stream side even if it is > infinite (but I guess the rational is this stream is very small and it > could join on a TB level non-steam data). New stream data will update the > hash table. In this case, the implementation have to update data > accordingly based on the new arriving data, which turned out to be > difficult to implement. > > > -Rui > > On Fri, Jan 3, 2020 at 10:19 AM Vladimir Sitnikov < > [email protected]> wrote: > > > Hi, > > > > Stream tables do not play very well for hash joins. > > In other words, if hash join would try to build a lookup table out of a > > stream, it could just run out of memory. > > > > Is there metadata or something like that to identify stream-like inputs > so > > hash join would ensure it does not > > try to build a lookup table out of the stream? > > > > The case is org.apache.calcite.test.StreamTest#testStreamToRelationJoin > > which transforms to the following. > > The plan is wrong because it would build hash lookup out of the second > > input which happens to be (infinite?) (STREAM). > > > > As a temporary workaround, I will increase the estimated rowcount for > > orders table to 100'000, but it would be nice to make those decisions > > metadata-driven. > > > > EnumerableProject(ROWTIME=[$2], ORDERID=[$3], SUPPLIERID=[$1]): rowcount > = > > 3000.0, cumulative cost = {6950.0 rows, 9650.0 cpu, 0.0 io}, id = 603 > > EnumerableHashJoin(condition=[=($0, $6)], joinType=[inner]): rowcount = > > 3000.0, cumulative cost = {3950.0 rows, 650.0 cpu, 0.0 io}, id = 602 > > EnumerableInterpreter: rowcount = 200.0, cumulative cost = {100.0 > rows, > > 100.0 cpu, 0.0 io}, id = 599 > > BindableTableScan(table=[[STREAM_JOINS, PRODUCTS]]): rowcount = > > 200.0, cumulative cost = {2.0 rows, 2.0100000000000002 cpu, 0.0 io}, id = > > 122 > > EnumerableProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], > > PRODUCT0=[CAST($2):VARCHAR(32) NOT NULL]): rowcount = 100.0, cumulative > > cost = {150.0 rows, 550.0 cpu, 0.0 io}, id = 601 > > EnumerableInterpreter: rowcount = 100.0, cumulative cost = {50.0 > > rows, 50.0 cpu, 0.0 io}, id = 600 > > BindableTableScan(table=[[STREAM_JOINS, ORDERS, (STREAM)]]): > > rowcount = 100.0, cumulative cost = {1.0 rows, 1.01 cpu, 0.0 io}, id = > 182 > > > > Vladimir > > >
