We did this in the project that I am currently working on but this part is
not open source so I don't have a link to give you.

Στις Σάβ, 23 Φεβ 2019 στις 9:35 μ.μ., ο/η Andrei Sereda <[email protected]>
έγραψε:

> > There are two tricky (but doable) parts:
> > 1) pass bound variables from one convention to the other;
> > 2) push the Filter (associated with the Correlate) holding the condition
> on the correlated variable to elastic;
>
> Do you know if any projects (libraries) already did this ? I would like to
> look at some code examples.
>
> On Fri, Feb 22, 2019 at 6:36 PM Stamatis Zampetakis <[email protected]>
> wrote:
>
> > No need to mention it, I also appreciate your contributions/discussions.
> >
> > Block-based nested loops is not implemented but we already have
> tuple-based
> > nested loop [1] and it is a good starting point.
> >
> > Regarding the particular query that you cited I think that even
> tuple-based
> > nested loop would work really well. For each tuple in orders you will
> bind
> > the productId and you will use it to probe product. Since you have 100
> > tuples in mongo you will end up sending 100 queries in elastic each one
> > though returning a single tuple corresponding to the appropriate id.
> There
> > are two tricky (but doable) parts:
> >
> > 1) pass bound variables from one convention to the other;
> > 2) push the Filter (associated with the Correlate) holding the condition
> on
> > the correlated variable to elastic;
> >
> > Having block-based nested loop could be used to send only one query to
> > Elastic instead of 100. If you start working on a block-based nested loop
> > join I would be happy to help/review the code. In any case, I plan to
> work
> > on it in the following months if I manage to find some time.
> >
> > [1]
> >
> >
> https://github.com/apache/calcite/blob/c3fd74a897ca1b469b6b776baeaa3c660ce5876a/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java#L1156
> >
> > Στις Παρ, 22 Φεβ 2019 στις 12:21 π.μ., ο/η Andrei Sereda
> <[email protected]
> > >
> > έγραψε:
> >
> > > Hi Stamatis,
> > >
> > > As usual, appreciate your time to answer my questions.
> > >
> > > I will try to give more context below.
> > >
> > > I am not I understand why bloom-filters and block based nested loop are
> > not
> > > possible for your use case
> > >
> > > Our destination data-source (ElasticSearch / Mongo) doesn’t support
> > > filtering based on provided bloom filter and fetching everything in
> > memory
> > > is not an option for us (reference data may have up to 200M records).
> > > Regarding block-based nested loops. From previous Vadym’s message I
> > > understand it is not yet implemented “Support for “batched” correlated
> > > execution is likely not there” (correct me if I’m wrong).
> > >
> > > About physical level implementation. Can you please point me to some
> > > examples of how I would introduce custom EnumerableSemiJoin to perform
> > the
> > > following join (efficiently) :
> > >
> > > select o.quantity, p.price, p.descriptionfrom
> > >   -- Order table is 100 records (in mongo)
> > >   mongo.Order o    join
> > >   -- Product table is 200M records (in elastic)
> > >   elastic.Product p   on (o.productId = p.id)
> > >
> > > To me, the efficient way to do it would be fetching 100 records from
> > > elastic.Product table first and perform join afterwards. I acknowledge
> > that
> > > I need to better educate myself about calcite physical conventions (any
> > > examples are appreciated).
> > >
> > > The reason I’m introducing streams into picture is because mongo has
> > notion
> > > of change streams <https://docs.mongodb.com/manual/changeStreams/> so
> > > potentially mongo table can also be a StreamableTable
> > > <https://calcite.apache.org/docs/stream.html> (I would like to submit
> a
> > > separate PR for this). Doing efficient joins for streams still stands.
> > >
> > > Many thanks,
> > > Andrei.
> > >
> > > On Wed, Feb 20, 2019 at 4:23 AM Stamatis Zampetakis <[email protected]
> >
> > > wrote:
> > >
> > > > Hi Andrei,
> > > >
> > > > I am not I understand why bloom-filters and block based nested loop
> are
> > > not
> > > > possible for your use case but I will try to provide some answers to
> > the
> > > > new questions you raised.
> > > >
> > > > By adding streams in the discussion I guess you add some additional
> > > > limitations on one side of the join (Orders table):
> > > >
> > > > (i) iterators/enumerators cannot be restarted;
> > > > (ii) iterators/enumerators are infinite;
> > > > (iii) there are no indexes.
> > > >
> > > > Andrei> 1. Am I correct to assume that each event in Orders table
> > (which
> > > is
> > > > a stream) will trigger full table scan (without filter) on Products
> > > table?
> > > >
> > > > The limitations imposed by streams already exclude some join options
> > but
> > > > still it doesn't mean that there is only one way to execute the join.
> > > > So the answer to this question is in general no. Consider for
> instance
> > > that
> > > > the products table is small (and fits in memory) then a
> > > > hash join algorithm could be used where you build a hash table based
> on
> > > > products and then probe the hash table with the stream relation.
> > > >
> > > > I would suspect that the default rule set in Calcite chooses this
> join
> > > > option by default (i.e., EnumerableJoin).
> > > >
> > > > Andrei> 2. Can I register my custom rule to rewrite the query when,
> > say,
> > > > Orders and Products tables are present to manually add a sub query ?
> > > >
> > > > It seems that this kind of optimizations are more appropriate at the
> > > > physical level so if you end up writting custom rules I think it may
> be
> > > > better
> > > > to target a physical convention (e.g., Enumerable relnodes).
> Moreover,
> > it
> > > > seems like you want to introduce the equivalent of semi-join reducers
> > so
> > > > in principle you should introduce something similar to
> > > EnumerableSemiJoin.
> > > > I am saying similar because the current implementation of
> > > > EnumerableSemiJoin
> > > > is almost the same to the EnumerableJoin (i.e., hash-join based) so I
> > > would
> > > > assume that you are not going to gain match by doing so.
> > > >
> > > > Andrei> 3. Do I have to disable SubQueryRemoveRule in this case ?
> > > >
> > > > If I remember correctly the rule tries to transform correlates to
> joins
> > > so
> > > > I guess it depends on what kind of physical join algorithm you want
> to
> > > use.
> > > > Correlate aka., nested loop join can be a great alternative in some
> > cases
> > > > as other people mentioned before.
> > > >
> > > > Andrei> 4. Vadym, not sure how sub-query computation will work. Can I
> > > > partially execute the query and convert the subquery into
> > > EnumerableValues
> > > > ?
> > > >
> > > > Not sure what do you mean here. If you write/choose (by custom rules)
> > the
> > > > algorithm your self you can control exactly what is happening and how
> > the
> > > > query will be executed.
> > > >
> > > > Andrei>Is there a way to solve this problem non-generically ?
> > > > We’re also hitting this limitation in Flink (which uses calcite but
> not
> > > > calcite streams) for similar use-case.
> > > >
> > > > It seems that you have some concrete query plans (using built-in
> > calcite
> > > &
> > > > other operators) which are not performing well.
> > > > Maybe it would be helpful if you could share the problematic plan in
> > this
> > > > discussion.
> > > >
> > > > Best,
> > > > Stamatis
> > > >
> > > >
> > > > Στις Τετ, 20 Φεβ 2019 στις 1:32 π.μ., ο/η Andrei Sereda
> > <[email protected]
> > > >
> > > > έγραψε:
> > > >
> > > > > Hello,
> > > > >
> > > > > I would like to resurrect this thread in the context of calcite
> > > streams.
> > > > > Unfortunately bloom-filters is not an option for the data-sources
> > being
> > > > > used.
> > > > >
> > > > > Say one has stream to table join
> > > > > <
> > https://calcite.apache.org/docs/stream.html#joining-streams-to-tables
> > > >.
> > > > > From docs example:
> > > > >
> > > > > SELECT STREAM
> > > > >     o.productId, o.orderId, o.units, p.name, p.unitPrice FROM
> Orders
> > > > > AS o -- streamable Table JOIN Products AS p -- reference data table
> > > > >     ON o.productId = p.productId;
> > > > >
> > > > >
> > > > >    1. Am I correct to assume that each event in Orders table (which
> > is
> > > a
> > > > >    stream) will trigger full table scan (without filter) on
> Products
> > > > table
> > > > >    ?
> > > > >    2. Can I register my custom rule to rewrite the query when, say,
> > > > Orders
> > > > >    and Products tables are present to manually add a sub query ?
> > > > >    3. Do I have to disable SubQueryRemoveRule in this case ?
> > > > >    4. Vadym, not sure how sub-query computation will work. Can I
> > > > partially
> > > > >    execute the query and convert the subquery into
> EnumerableValues ?
> > > > >
> > > > > Is there a way to solve this problem non-generically ?
> > > > >
> > > > > We’re also hitting this limitation in Flink (which uses calcite but
> > not
> > > > > calcite streams) for similar use-case.
> > > > >
> > > > > Many Thanks,
> > > > > Andrei.
> > > > >
> > > > > On Thu, Aug 30, 2018 at 5:27 PM Vineet Garg <[email protected]
> >
> > > > wrote:
> > > > >
> > > > > > Hive actually does this optimization (it is called semi-join
> > > reduction)
> > > > > by
> > > > > > generating bloom-filters on one side and passing it on to the
> other
> > > > side.
> > > > > > This is not a rewrite but instead a physical implementation.
> > > > > >
> > > > > > Vineet
> > > > > >
> > > > > > On Aug 29, 2018, at 10:34 AM, Vladimir Sitnikov <
> > > > > > [email protected]<mailto:[email protected]>>
> > > > wrote:
> > > > > >
> > > > > > Nested loops are never likely to happe
> > > > > >
> > > > > > What's wrong with that?
> > > > > >
> > > > > > Apparently Andrei asks for that, and "subquery precomputation" is
> > > quite
> > > > > > close to nested loops in my opinion.
> > > > > >
> > > > > > Vladimir
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to