+1 on having a type of property on Relnode to make know which node is steam
or non-stream.  In Apache Beam SQL's practice, stream joins are already
metadata driven in which if there is one side stream and another side
non-stream, we use hash-join like implementation but build that table on
the non-stream side.


Technically, it's feasible to build hash table on stream side even if it is
infinite (but I guess the rational is this stream is very small and it
could join on a TB level non-steam data). New stream data will update the
hash table. In this case, the implementation have to update data
accordingly based on the new arriving data, which turned out to be
difficult to implement.


-Rui

On Fri, Jan 3, 2020 at 10:19 AM Vladimir Sitnikov <
sitnikov.vladi...@gmail.com> wrote:

> Hi,
>
> Stream tables do not play very well for hash joins.
> In other words, if hash join would try to build a lookup table out of a
> stream, it could just run out of memory.
>
> Is there metadata or something like that to identify stream-like inputs so
> hash join would ensure it does not
> try to build a lookup table out of the stream?
>
> The case is org.apache.calcite.test.StreamTest#testStreamToRelationJoin
> which transforms to the following.
> The plan is wrong because it would build hash lookup out of the second
> input which happens to be (infinite?) (STREAM).
>
> As a temporary workaround, I will increase the estimated rowcount for
> orders table to 100'000, but it would be nice to make those decisions
> metadata-driven.
>
> EnumerableProject(ROWTIME=[$2], ORDERID=[$3], SUPPLIERID=[$1]): rowcount =
> 3000.0, cumulative cost = {6950.0 rows, 9650.0 cpu, 0.0 io}, id = 603
>   EnumerableHashJoin(condition=[=($0, $6)], joinType=[inner]): rowcount =
> 3000.0, cumulative cost = {3950.0 rows, 650.0 cpu, 0.0 io}, id = 602
>     EnumerableInterpreter: rowcount = 200.0, cumulative cost = {100.0 rows,
> 100.0 cpu, 0.0 io}, id = 599
>       BindableTableScan(table=[[STREAM_JOINS, PRODUCTS]]): rowcount =
> 200.0, cumulative cost = {2.0 rows, 2.0100000000000002 cpu, 0.0 io}, id =
> 122
>     EnumerableProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3],
> PRODUCT0=[CAST($2):VARCHAR(32) NOT NULL]): rowcount = 100.0, cumulative
> cost = {150.0 rows, 550.0 cpu, 0.0 io}, id = 601
>       EnumerableInterpreter: rowcount = 100.0, cumulative cost = {50.0
> rows, 50.0 cpu, 0.0 io}, id = 600
>         BindableTableScan(table=[[STREAM_JOINS, ORDERS, (STREAM)]]):
> rowcount = 100.0, cumulative cost = {1.0 rows, 1.01 cpu, 0.0 io}, id = 182
>
> Vladimir
>

Reply via email to