Possibly we could also rely on RelOptCostFactory#makeInfiniteCost and
RelOptCost#isInfinite.

Best,
Stamatis

On Fri, Jan 3, 2020 at 8:04 PM Rui Wang <[email protected]> wrote:

> +1 on having a type of property on Relnode to make know which node is steam
> or non-stream.  In Apache Beam SQL's practice, stream joins are already
> metadata driven in which if there is one side stream and another side
> non-stream, we use hash-join like implementation but build that table on
> the non-stream side.
>
>
> Technically, it's feasible to build hash table on stream side even if it is
> infinite (but I guess the rational is this stream is very small and it
> could join on a TB level non-steam data). New stream data will update the
> hash table. In this case, the implementation have to update data
> accordingly based on the new arriving data, which turned out to be
> difficult to implement.
>
>
> -Rui
>
> On Fri, Jan 3, 2020 at 10:19 AM Vladimir Sitnikov <
> [email protected]> wrote:
>
> > Hi,
> >
> > Stream tables do not play very well for hash joins.
> > In other words, if hash join would try to build a lookup table out of a
> > stream, it could just run out of memory.
> >
> > Is there metadata or something like that to identify stream-like inputs
> so
> > hash join would ensure it does not
> > try to build a lookup table out of the stream?
> >
> > The case is org.apache.calcite.test.StreamTest#testStreamToRelationJoin
> > which transforms to the following.
> > The plan is wrong because it would build hash lookup out of the second
> > input which happens to be (infinite?) (STREAM).
> >
> > As a temporary workaround, I will increase the estimated rowcount for
> > orders table to 100'000, but it would be nice to make those decisions
> > metadata-driven.
> >
> > EnumerableProject(ROWTIME=[$2], ORDERID=[$3], SUPPLIERID=[$1]): rowcount
> =
> > 3000.0, cumulative cost = {6950.0 rows, 9650.0 cpu, 0.0 io}, id = 603
> >   EnumerableHashJoin(condition=[=($0, $6)], joinType=[inner]): rowcount =
> > 3000.0, cumulative cost = {3950.0 rows, 650.0 cpu, 0.0 io}, id = 602
> >     EnumerableInterpreter: rowcount = 200.0, cumulative cost = {100.0
> rows,
> > 100.0 cpu, 0.0 io}, id = 599
> >       BindableTableScan(table=[[STREAM_JOINS, PRODUCTS]]): rowcount =
> > 200.0, cumulative cost = {2.0 rows, 2.0100000000000002 cpu, 0.0 io}, id =
> > 122
> >     EnumerableProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3],
> > PRODUCT0=[CAST($2):VARCHAR(32) NOT NULL]): rowcount = 100.0, cumulative
> > cost = {150.0 rows, 550.0 cpu, 0.0 io}, id = 601
> >       EnumerableInterpreter: rowcount = 100.0, cumulative cost = {50.0
> > rows, 50.0 cpu, 0.0 io}, id = 600
> >         BindableTableScan(table=[[STREAM_JOINS, ORDERS, (STREAM)]]):
> > rowcount = 100.0, cumulative cost = {1.0 rows, 1.01 cpu, 0.0 io}, id =
> 182
> >
> > Vladimir
> >
>

Reply via email to