No need to mention it, I also appreciate your contributions/discussions. Block-based nested loops is not implemented but we already have tuple-based nested loop [1] and it is a good starting point.
Regarding the particular query that you cited I think that even tuple-based nested loop would work really well. For each tuple in orders you will bind the productId and you will use it to probe product. Since you have 100 tuples in mongo you will end up sending 100 queries in elastic each one though returning a single tuple corresponding to the appropriate id. There are two tricky (but doable) parts: 1) pass bound variables from one convention to the other; 2) push the Filter (associated with the Correlate) holding the condition on the correlated variable to elastic; Having block-based nested loop could be used to send only one query to Elastic instead of 100. If you start working on a block-based nested loop join I would be happy to help/review the code. In any case, I plan to work on it in the following months if I manage to find some time. [1] https://github.com/apache/calcite/blob/c3fd74a897ca1b469b6b776baeaa3c660ce5876a/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java#L1156 Στις Παρ, 22 Φεβ 2019 στις 12:21 π.μ., ο/η Andrei Sereda <[email protected]> έγραψε: > Hi Stamatis, > > As usual, appreciate your time to answer my questions. > > I will try to give more context below. > > I am not I understand why bloom-filters and block based nested loop are not > possible for your use case > > Our destination data-source (ElasticSearch / Mongo) doesn’t support > filtering based on provided bloom filter and fetching everything in memory > is not an option for us (reference data may have up to 200M records). > Regarding block-based nested loops. From previous Vadym’s message I > understand it is not yet implemented “Support for “batched” correlated > execution is likely not there” (correct me if I’m wrong). > > About physical level implementation. Can you please point me to some > examples of how I would introduce custom EnumerableSemiJoin to perform the > following join (efficiently) : > > select o.quantity, p.price, p.descriptionfrom > -- Order table is 100 records (in mongo) > mongo.Order o join > -- Product table is 200M records (in elastic) > elastic.Product p on (o.productId = p.id) > > To me, the efficient way to do it would be fetching 100 records from > elastic.Product table first and perform join afterwards. I acknowledge that > I need to better educate myself about calcite physical conventions (any > examples are appreciated). > > The reason I’m introducing streams into picture is because mongo has notion > of change streams <https://docs.mongodb.com/manual/changeStreams/> so > potentially mongo table can also be a StreamableTable > <https://calcite.apache.org/docs/stream.html> (I would like to submit a > separate PR for this). Doing efficient joins for streams still stands. > > Many thanks, > Andrei. > > On Wed, Feb 20, 2019 at 4:23 AM Stamatis Zampetakis <[email protected]> > wrote: > > > Hi Andrei, > > > > I am not I understand why bloom-filters and block based nested loop are > not > > possible for your use case but I will try to provide some answers to the > > new questions you raised. > > > > By adding streams in the discussion I guess you add some additional > > limitations on one side of the join (Orders table): > > > > (i) iterators/enumerators cannot be restarted; > > (ii) iterators/enumerators are infinite; > > (iii) there are no indexes. > > > > Andrei> 1. Am I correct to assume that each event in Orders table (which > is > > a stream) will trigger full table scan (without filter) on Products > table? > > > > The limitations imposed by streams already exclude some join options but > > still it doesn't mean that there is only one way to execute the join. > > So the answer to this question is in general no. Consider for instance > that > > the products table is small (and fits in memory) then a > > hash join algorithm could be used where you build a hash table based on > > products and then probe the hash table with the stream relation. > > > > I would suspect that the default rule set in Calcite chooses this join > > option by default (i.e., EnumerableJoin). > > > > Andrei> 2. Can I register my custom rule to rewrite the query when, say, > > Orders and Products tables are present to manually add a sub query ? > > > > It seems that this kind of optimizations are more appropriate at the > > physical level so if you end up writting custom rules I think it may be > > better > > to target a physical convention (e.g., Enumerable relnodes). Moreover, it > > seems like you want to introduce the equivalent of semi-join reducers so > > in principle you should introduce something similar to > EnumerableSemiJoin. > > I am saying similar because the current implementation of > > EnumerableSemiJoin > > is almost the same to the EnumerableJoin (i.e., hash-join based) so I > would > > assume that you are not going to gain match by doing so. > > > > Andrei> 3. Do I have to disable SubQueryRemoveRule in this case ? > > > > If I remember correctly the rule tries to transform correlates to joins > so > > I guess it depends on what kind of physical join algorithm you want to > use. > > Correlate aka., nested loop join can be a great alternative in some cases > > as other people mentioned before. > > > > Andrei> 4. Vadym, not sure how sub-query computation will work. Can I > > partially execute the query and convert the subquery into > EnumerableValues > > ? > > > > Not sure what do you mean here. If you write/choose (by custom rules) the > > algorithm your self you can control exactly what is happening and how the > > query will be executed. > > > > Andrei>Is there a way to solve this problem non-generically ? > > We’re also hitting this limitation in Flink (which uses calcite but not > > calcite streams) for similar use-case. > > > > It seems that you have some concrete query plans (using built-in calcite > & > > other operators) which are not performing well. > > Maybe it would be helpful if you could share the problematic plan in this > > discussion. > > > > Best, > > Stamatis > > > > > > Στις Τετ, 20 Φεβ 2019 στις 1:32 π.μ., ο/η Andrei Sereda <[email protected] > > > > έγραψε: > > > > > Hello, > > > > > > I would like to resurrect this thread in the context of calcite > streams. > > > Unfortunately bloom-filters is not an option for the data-sources being > > > used. > > > > > > Say one has stream to table join > > > <https://calcite.apache.org/docs/stream.html#joining-streams-to-tables > >. > > > From docs example: > > > > > > SELECT STREAM > > > o.productId, o.orderId, o.units, p.name, p.unitPrice FROM Orders > > > AS o -- streamable Table JOIN Products AS p -- reference data table > > > ON o.productId = p.productId; > > > > > > > > > 1. Am I correct to assume that each event in Orders table (which is > a > > > stream) will trigger full table scan (without filter) on Products > > table > > > ? > > > 2. Can I register my custom rule to rewrite the query when, say, > > Orders > > > and Products tables are present to manually add a sub query ? > > > 3. Do I have to disable SubQueryRemoveRule in this case ? > > > 4. Vadym, not sure how sub-query computation will work. Can I > > partially > > > execute the query and convert the subquery into EnumerableValues ? > > > > > > Is there a way to solve this problem non-generically ? > > > > > > We’re also hitting this limitation in Flink (which uses calcite but not > > > calcite streams) for similar use-case. > > > > > > Many Thanks, > > > Andrei. > > > > > > On Thu, Aug 30, 2018 at 5:27 PM Vineet Garg <[email protected]> > > wrote: > > > > > > > Hive actually does this optimization (it is called semi-join > reduction) > > > by > > > > generating bloom-filters on one side and passing it on to the other > > side. > > > > This is not a rewrite but instead a physical implementation. > > > > > > > > Vineet > > > > > > > > On Aug 29, 2018, at 10:34 AM, Vladimir Sitnikov < > > > > [email protected]<mailto:[email protected]>> > > wrote: > > > > > > > > Nested loops are never likely to happe > > > > > > > > What's wrong with that? > > > > > > > > Apparently Andrei asks for that, and "subquery precomputation" is > quite > > > > close to nested loops in my opinion. > > > > > > > > Vladimir > > > > > > > > > > > > > >
