> There are two tricky (but doable) parts:
> 1) pass bound variables from one convention to the other;
> 2) push the Filter (associated with the Correlate) holding the condition
on the correlated variable to elastic;

Do you know if any projects (libraries) already did this ? I would like to
look at some code examples.

On Fri, Feb 22, 2019 at 6:36 PM Stamatis Zampetakis <[email protected]>
wrote:

> No need to mention it, I also appreciate your contributions/discussions.
>
> Block-based nested loops is not implemented but we already have tuple-based
> nested loop [1] and it is a good starting point.
>
> Regarding the particular query that you cited I think that even tuple-based
> nested loop would work really well. For each tuple in orders you will bind
> the productId and you will use it to probe product. Since you have 100
> tuples in mongo you will end up sending 100 queries in elastic each one
> though returning a single tuple corresponding to the appropriate id. There
> are two tricky (but doable) parts:
>
> 1) pass bound variables from one convention to the other;
> 2) push the Filter (associated with the Correlate) holding the condition on
> the correlated variable to elastic;
>
> Having block-based nested loop could be used to send only one query to
> Elastic instead of 100. If you start working on a block-based nested loop
> join I would be happy to help/review the code. In any case, I plan to work
> on it in the following months if I manage to find some time.
>
> [1]
>
> https://github.com/apache/calcite/blob/c3fd74a897ca1b469b6b776baeaa3c660ce5876a/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java#L1156
>
> Στις Παρ, 22 Φεβ 2019 στις 12:21 π.μ., ο/η Andrei Sereda <[email protected]
> >
> έγραψε:
>
> > Hi Stamatis,
> >
> > As usual, appreciate your time to answer my questions.
> >
> > I will try to give more context below.
> >
> > I am not I understand why bloom-filters and block based nested loop are
> not
> > possible for your use case
> >
> > Our destination data-source (ElasticSearch / Mongo) doesn’t support
> > filtering based on provided bloom filter and fetching everything in
> memory
> > is not an option for us (reference data may have up to 200M records).
> > Regarding block-based nested loops. From previous Vadym’s message I
> > understand it is not yet implemented “Support for “batched” correlated
> > execution is likely not there” (correct me if I’m wrong).
> >
> > About physical level implementation. Can you please point me to some
> > examples of how I would introduce custom EnumerableSemiJoin to perform
> the
> > following join (efficiently) :
> >
> > select o.quantity, p.price, p.descriptionfrom
> >   -- Order table is 100 records (in mongo)
> >   mongo.Order o    join
> >   -- Product table is 200M records (in elastic)
> >   elastic.Product p   on (o.productId = p.id)
> >
> > To me, the efficient way to do it would be fetching 100 records from
> > elastic.Product table first and perform join afterwards. I acknowledge
> that
> > I need to better educate myself about calcite physical conventions (any
> > examples are appreciated).
> >
> > The reason I’m introducing streams into picture is because mongo has
> notion
> > of change streams <https://docs.mongodb.com/manual/changeStreams/> so
> > potentially mongo table can also be a StreamableTable
> > <https://calcite.apache.org/docs/stream.html> (I would like to submit a
> > separate PR for this). Doing efficient joins for streams still stands.
> >
> > Many thanks,
> > Andrei.
> >
> > On Wed, Feb 20, 2019 at 4:23 AM Stamatis Zampetakis <[email protected]>
> > wrote:
> >
> > > Hi Andrei,
> > >
> > > I am not I understand why bloom-filters and block based nested loop are
> > not
> > > possible for your use case but I will try to provide some answers to
> the
> > > new questions you raised.
> > >
> > > By adding streams in the discussion I guess you add some additional
> > > limitations on one side of the join (Orders table):
> > >
> > > (i) iterators/enumerators cannot be restarted;
> > > (ii) iterators/enumerators are infinite;
> > > (iii) there are no indexes.
> > >
> > > Andrei> 1. Am I correct to assume that each event in Orders table
> (which
> > is
> > > a stream) will trigger full table scan (without filter) on Products
> > table?
> > >
> > > The limitations imposed by streams already exclude some join options
> but
> > > still it doesn't mean that there is only one way to execute the join.
> > > So the answer to this question is in general no. Consider for instance
> > that
> > > the products table is small (and fits in memory) then a
> > > hash join algorithm could be used where you build a hash table based on
> > > products and then probe the hash table with the stream relation.
> > >
> > > I would suspect that the default rule set in Calcite chooses this join
> > > option by default (i.e., EnumerableJoin).
> > >
> > > Andrei> 2. Can I register my custom rule to rewrite the query when,
> say,
> > > Orders and Products tables are present to manually add a sub query ?
> > >
> > > It seems that this kind of optimizations are more appropriate at the
> > > physical level so if you end up writting custom rules I think it may be
> > > better
> > > to target a physical convention (e.g., Enumerable relnodes). Moreover,
> it
> > > seems like you want to introduce the equivalent of semi-join reducers
> so
> > > in principle you should introduce something similar to
> > EnumerableSemiJoin.
> > > I am saying similar because the current implementation of
> > > EnumerableSemiJoin
> > > is almost the same to the EnumerableJoin (i.e., hash-join based) so I
> > would
> > > assume that you are not going to gain match by doing so.
> > >
> > > Andrei> 3. Do I have to disable SubQueryRemoveRule in this case ?
> > >
> > > If I remember correctly the rule tries to transform correlates to joins
> > so
> > > I guess it depends on what kind of physical join algorithm you want to
> > use.
> > > Correlate aka., nested loop join can be a great alternative in some
> cases
> > > as other people mentioned before.
> > >
> > > Andrei> 4. Vadym, not sure how sub-query computation will work. Can I
> > > partially execute the query and convert the subquery into
> > EnumerableValues
> > > ?
> > >
> > > Not sure what do you mean here. If you write/choose (by custom rules)
> the
> > > algorithm your self you can control exactly what is happening and how
> the
> > > query will be executed.
> > >
> > > Andrei>Is there a way to solve this problem non-generically ?
> > > We’re also hitting this limitation in Flink (which uses calcite but not
> > > calcite streams) for similar use-case.
> > >
> > > It seems that you have some concrete query plans (using built-in
> calcite
> > &
> > > other operators) which are not performing well.
> > > Maybe it would be helpful if you could share the problematic plan in
> this
> > > discussion.
> > >
> > > Best,
> > > Stamatis
> > >
> > >
> > > Στις Τετ, 20 Φεβ 2019 στις 1:32 π.μ., ο/η Andrei Sereda
> <[email protected]
> > >
> > > έγραψε:
> > >
> > > > Hello,
> > > >
> > > > I would like to resurrect this thread in the context of calcite
> > streams.
> > > > Unfortunately bloom-filters is not an option for the data-sources
> being
> > > > used.
> > > >
> > > > Say one has stream to table join
> > > > <
> https://calcite.apache.org/docs/stream.html#joining-streams-to-tables
> > >.
> > > > From docs example:
> > > >
> > > > SELECT STREAM
> > > >     o.productId, o.orderId, o.units, p.name, p.unitPrice FROM Orders
> > > > AS o -- streamable Table JOIN Products AS p -- reference data table
> > > >     ON o.productId = p.productId;
> > > >
> > > >
> > > >    1. Am I correct to assume that each event in Orders table (which
> is
> > a
> > > >    stream) will trigger full table scan (without filter) on Products
> > > table
> > > >    ?
> > > >    2. Can I register my custom rule to rewrite the query when, say,
> > > Orders
> > > >    and Products tables are present to manually add a sub query ?
> > > >    3. Do I have to disable SubQueryRemoveRule in this case ?
> > > >    4. Vadym, not sure how sub-query computation will work. Can I
> > > partially
> > > >    execute the query and convert the subquery into EnumerableValues ?
> > > >
> > > > Is there a way to solve this problem non-generically ?
> > > >
> > > > We’re also hitting this limitation in Flink (which uses calcite but
> not
> > > > calcite streams) for similar use-case.
> > > >
> > > > Many Thanks,
> > > > Andrei.
> > > >
> > > > On Thu, Aug 30, 2018 at 5:27 PM Vineet Garg <[email protected]>
> > > wrote:
> > > >
> > > > > Hive actually does this optimization (it is called semi-join
> > reduction)
> > > > by
> > > > > generating bloom-filters on one side and passing it on to the other
> > > side.
> > > > > This is not a rewrite but instead a physical implementation.
> > > > >
> > > > > Vineet
> > > > >
> > > > > On Aug 29, 2018, at 10:34 AM, Vladimir Sitnikov <
> > > > > [email protected]<mailto:[email protected]>>
> > > wrote:
> > > > >
> > > > > Nested loops are never likely to happe
> > > > >
> > > > > What's wrong with that?
> > > > >
> > > > > Apparently Andrei asks for that, and "subquery precomputation" is
> > quite
> > > > > close to nested loops in my opinion.
> > > > >
> > > > > Vladimir
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to