We did this in the project that I am currently working on but this part is not open source so I don't have a link to give you.
Στις Σάβ, 23 Φεβ 2019 στις 9:35 μ.μ., ο/η Andrei Sereda <[email protected]> έγραψε: > > There are two tricky (but doable) parts: > > 1) pass bound variables from one convention to the other; > > 2) push the Filter (associated with the Correlate) holding the condition > on the correlated variable to elastic; > > Do you know if any projects (libraries) already did this ? I would like to > look at some code examples. > > On Fri, Feb 22, 2019 at 6:36 PM Stamatis Zampetakis <[email protected]> > wrote: > > > No need to mention it, I also appreciate your contributions/discussions. > > > > Block-based nested loops is not implemented but we already have > tuple-based > > nested loop [1] and it is a good starting point. > > > > Regarding the particular query that you cited I think that even > tuple-based > > nested loop would work really well. For each tuple in orders you will > bind > > the productId and you will use it to probe product. Since you have 100 > > tuples in mongo you will end up sending 100 queries in elastic each one > > though returning a single tuple corresponding to the appropriate id. > There > > are two tricky (but doable) parts: > > > > 1) pass bound variables from one convention to the other; > > 2) push the Filter (associated with the Correlate) holding the condition > on > > the correlated variable to elastic; > > > > Having block-based nested loop could be used to send only one query to > > Elastic instead of 100. If you start working on a block-based nested loop > > join I would be happy to help/review the code. In any case, I plan to > work > > on it in the following months if I manage to find some time. > > > > [1] > > > > > https://github.com/apache/calcite/blob/c3fd74a897ca1b469b6b776baeaa3c660ce5876a/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java#L1156 > > > > Στις Παρ, 22 Φεβ 2019 στις 12:21 π.μ., ο/η Andrei Sereda > <[email protected] > > > > > έγραψε: > > > > > Hi Stamatis, > > > > > > As usual, appreciate your time to answer my questions. > > > > > > I will try to give more context below. > > > > > > I am not I understand why bloom-filters and block based nested loop are > > not > > > possible for your use case > > > > > > Our destination data-source (ElasticSearch / Mongo) doesn’t support > > > filtering based on provided bloom filter and fetching everything in > > memory > > > is not an option for us (reference data may have up to 200M records). > > > Regarding block-based nested loops. From previous Vadym’s message I > > > understand it is not yet implemented “Support for “batched” correlated > > > execution is likely not there” (correct me if I’m wrong). > > > > > > About physical level implementation. Can you please point me to some > > > examples of how I would introduce custom EnumerableSemiJoin to perform > > the > > > following join (efficiently) : > > > > > > select o.quantity, p.price, p.descriptionfrom > > > -- Order table is 100 records (in mongo) > > > mongo.Order o join > > > -- Product table is 200M records (in elastic) > > > elastic.Product p on (o.productId = p.id) > > > > > > To me, the efficient way to do it would be fetching 100 records from > > > elastic.Product table first and perform join afterwards. I acknowledge > > that > > > I need to better educate myself about calcite physical conventions (any > > > examples are appreciated). > > > > > > The reason I’m introducing streams into picture is because mongo has > > notion > > > of change streams <https://docs.mongodb.com/manual/changeStreams/> so > > > potentially mongo table can also be a StreamableTable > > > <https://calcite.apache.org/docs/stream.html> (I would like to submit > a > > > separate PR for this). Doing efficient joins for streams still stands. > > > > > > Many thanks, > > > Andrei. > > > > > > On Wed, Feb 20, 2019 at 4:23 AM Stamatis Zampetakis <[email protected] > > > > > wrote: > > > > > > > Hi Andrei, > > > > > > > > I am not I understand why bloom-filters and block based nested loop > are > > > not > > > > possible for your use case but I will try to provide some answers to > > the > > > > new questions you raised. > > > > > > > > By adding streams in the discussion I guess you add some additional > > > > limitations on one side of the join (Orders table): > > > > > > > > (i) iterators/enumerators cannot be restarted; > > > > (ii) iterators/enumerators are infinite; > > > > (iii) there are no indexes. > > > > > > > > Andrei> 1. Am I correct to assume that each event in Orders table > > (which > > > is > > > > a stream) will trigger full table scan (without filter) on Products > > > table? > > > > > > > > The limitations imposed by streams already exclude some join options > > but > > > > still it doesn't mean that there is only one way to execute the join. > > > > So the answer to this question is in general no. Consider for > instance > > > that > > > > the products table is small (and fits in memory) then a > > > > hash join algorithm could be used where you build a hash table based > on > > > > products and then probe the hash table with the stream relation. > > > > > > > > I would suspect that the default rule set in Calcite chooses this > join > > > > option by default (i.e., EnumerableJoin). > > > > > > > > Andrei> 2. Can I register my custom rule to rewrite the query when, > > say, > > > > Orders and Products tables are present to manually add a sub query ? > > > > > > > > It seems that this kind of optimizations are more appropriate at the > > > > physical level so if you end up writting custom rules I think it may > be > > > > better > > > > to target a physical convention (e.g., Enumerable relnodes). > Moreover, > > it > > > > seems like you want to introduce the equivalent of semi-join reducers > > so > > > > in principle you should introduce something similar to > > > EnumerableSemiJoin. > > > > I am saying similar because the current implementation of > > > > EnumerableSemiJoin > > > > is almost the same to the EnumerableJoin (i.e., hash-join based) so I > > > would > > > > assume that you are not going to gain match by doing so. > > > > > > > > Andrei> 3. Do I have to disable SubQueryRemoveRule in this case ? > > > > > > > > If I remember correctly the rule tries to transform correlates to > joins > > > so > > > > I guess it depends on what kind of physical join algorithm you want > to > > > use. > > > > Correlate aka., nested loop join can be a great alternative in some > > cases > > > > as other people mentioned before. > > > > > > > > Andrei> 4. Vadym, not sure how sub-query computation will work. Can I > > > > partially execute the query and convert the subquery into > > > EnumerableValues > > > > ? > > > > > > > > Not sure what do you mean here. If you write/choose (by custom rules) > > the > > > > algorithm your self you can control exactly what is happening and how > > the > > > > query will be executed. > > > > > > > > Andrei>Is there a way to solve this problem non-generically ? > > > > We’re also hitting this limitation in Flink (which uses calcite but > not > > > > calcite streams) for similar use-case. > > > > > > > > It seems that you have some concrete query plans (using built-in > > calcite > > > & > > > > other operators) which are not performing well. > > > > Maybe it would be helpful if you could share the problematic plan in > > this > > > > discussion. > > > > > > > > Best, > > > > Stamatis > > > > > > > > > > > > Στις Τετ, 20 Φεβ 2019 στις 1:32 π.μ., ο/η Andrei Sereda > > <[email protected] > > > > > > > > έγραψε: > > > > > > > > > Hello, > > > > > > > > > > I would like to resurrect this thread in the context of calcite > > > streams. > > > > > Unfortunately bloom-filters is not an option for the data-sources > > being > > > > > used. > > > > > > > > > > Say one has stream to table join > > > > > < > > https://calcite.apache.org/docs/stream.html#joining-streams-to-tables > > > >. > > > > > From docs example: > > > > > > > > > > SELECT STREAM > > > > > o.productId, o.orderId, o.units, p.name, p.unitPrice FROM > Orders > > > > > AS o -- streamable Table JOIN Products AS p -- reference data table > > > > > ON o.productId = p.productId; > > > > > > > > > > > > > > > 1. Am I correct to assume that each event in Orders table (which > > is > > > a > > > > > stream) will trigger full table scan (without filter) on > Products > > > > table > > > > > ? > > > > > 2. Can I register my custom rule to rewrite the query when, say, > > > > Orders > > > > > and Products tables are present to manually add a sub query ? > > > > > 3. Do I have to disable SubQueryRemoveRule in this case ? > > > > > 4. Vadym, not sure how sub-query computation will work. Can I > > > > partially > > > > > execute the query and convert the subquery into > EnumerableValues ? > > > > > > > > > > Is there a way to solve this problem non-generically ? > > > > > > > > > > We’re also hitting this limitation in Flink (which uses calcite but > > not > > > > > calcite streams) for similar use-case. > > > > > > > > > > Many Thanks, > > > > > Andrei. > > > > > > > > > > On Thu, Aug 30, 2018 at 5:27 PM Vineet Garg <[email protected] > > > > > > wrote: > > > > > > > > > > > Hive actually does this optimization (it is called semi-join > > > reduction) > > > > > by > > > > > > generating bloom-filters on one side and passing it on to the > other > > > > side. > > > > > > This is not a rewrite but instead a physical implementation. > > > > > > > > > > > > Vineet > > > > > > > > > > > > On Aug 29, 2018, at 10:34 AM, Vladimir Sitnikov < > > > > > > [email protected]<mailto:[email protected]>> > > > > wrote: > > > > > > > > > > > > Nested loops are never likely to happe > > > > > > > > > > > > What's wrong with that? > > > > > > > > > > > > Apparently Andrei asks for that, and "subquery precomputation" is > > > quite > > > > > > close to nested loops in my opinion. > > > > > > > > > > > > Vladimir > > > > > > > > > > > > > > > > > > > > > > > > > > >
