Hi Luca,
If I understand you correctly, what you are looking for is temporal table join. 
Flink has implemented this based on Calcite, maybe is a good reference. You 
could find more details in [1], [2], [3].
- [1] https://issues.apache.org/jira/browse/CALCITE-1912
- [2] https://lists.apache.org/thread/s8rx569p6tqbh8ybomodo5w3h2rbfvkr
- [3] https://issues.apache.org/jira/browse/FLINK-12269

Best,
Dan Zou   





> 2023年5月17日 23:41,Luca Marchi <[email protected]> 写道:
> 
> Morning everyone, 
> in our company we are running a POC using Apache Calcite, and we would like 
> to collect some feedbacks from you for the scenario mentioned below.
> 
> There is a service API that allows retrieving some `Book`s, and we would like 
> to build a table adapter on top of this service; this API 
> only accepts a set of IDs, and if no IDs are provided, no result is returned.
> 
> ```
>  interface BookService {
>    /** Returns the books matching the given IDs.
>     *
>     * <p>If not IDs is provided, no result is returned.
>     */
>    List<Book> findBooksByIds(Set<String> ids);
>  }
> 
>  record Book(String id, String title) {};
> ```
> 
> A requirement of this table is that it has to support join, and we would like 
> to support joining by ID in an efficient way.
> 
> The goal is to define a rule that forces the query planner to always push 
> down join predicates into a table scan.
> 
> Given the following `book` table:
> 
> ```java
> /** A table which represents books, queryable only by their ID. */
> final class BookTable extends AbstractTable implements FilterableTable {
>  private final BookService service;
> 
>  BookTable(BookService service) {
>    this.service = service;
>  }
> 
>  @Override
>  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
>    return new RelDataTypeFactory.Builder(typeFactory)
>        .add("id", SqlTypeName.VARCHAR)
>        .add("title", SqlTypeName.VARCHAR)
>        .build();
>  }
> 
>  @Override
>  public Enumerable<Object[]> scan(DataContext root, List<RexNode> filters) {
>    Set<String> bookIds = getBooksId(filters);
>    List<Object[]> result = service.findBooksByIds(bookIds)
>        .stream()
>        .map(b -> new Object[]{b.id, b.title})
>        .toList();
> 
>    return Linq4j.asEnumerable(result);
>  }
> 
>  private static Set<String> getBooksId(List<RexNode> filters) {
>    if (filters.size() != 1) {
>      throw new IllegalArgumentException("Expected one filter to the ID, 
> found: %d".formatted(filters.size()));
>    }
> 
>    RexNode filter = filters.get(0);
>    RexNode leftCondition = ((RexCall) filter).getOperands().get(0);
>    RexNode rightCondition = ((RexCall) filter).getOperands().get(1);
> 
>    if (leftCondition instanceof RexInputRef left
>        && rightCondition instanceof RexLiteral right
>        // The index of the ID column is 1.
>        && left.getIndex() == 1) {
>      if (filter.isA(SqlKind.EQUALS)) {
>        String bookId = right.getValue2().toString();
>        return ImmutableSet.of(bookId);
>      }
>      if (filter.isA(SqlKind.SEARCH)) {
>        @SuppressWarnings("unchecked")
>        Sarg<NlsString> searchArguments = right.getValueAs(Sarg.class);
>        return searchArguments.rangeSet.asRanges().stream()
>            .map(Range::lowerEndpoint)
>            .map(NlsString::getValue)
>            .collect(toSet());
>      }
>    }
>    throw new IllegalArgumentException("Unexpected operator, found: 
> %s".formatted(filter.getKind()));
>  }
> }
> ```
> 
> The API of the `BookService` always expects a set of IDs, and in case of 
> query like (assuming an entry in `book` matching the ID `'a'`):
> ```sql
> WITH config (id, val) AS (
>    VALUES ('a', 3), ('b', 5)
> )
> SELECT b.* FROM books b
> INNER JOIN config ON b.id = config.id
> WHERE config.val > 4
> ```
> 
> Calcite produces the following plan:
> ```
> EnumerableCalc(expr#0..2=[{inputs}], id=[$t1], title=[$t2])
>  EnumerableMergeJoin(condition=[=($0, $1)], joinType=[inner])
>    EnumerableSort(sort0=[$0], dir0=[ASC])
>      EnumerableCalc(expr#0..1=[{inputs}], id=[$t0])
>        EnumerableTableScan(table=[[books]])
>    EnumerableSort(sort0=[$0], dir0=[ASC])
>      EnumerableTableScan(table=[[books]])
> ```
> 
> This means Calcite performs a full table scan of the `book` table, and since 
> the `RexNode` filters in the `scan` method are empty, no result is returned 
> (in this example we are using a value statement scoped views, but ideally the 
> solution we are looking for should be valid for other table).
> Under some defined circustances, Postgres generates Nested-Loop query plan 
> for join: it first selects the row of the table A matching a given condition, 
> then iterates over the retrieved rows and performs a scan of table B looking 
> for rows that match the join condition; and this seems something we would 
> like to enforce it here.
> 
> In summary, we would like to implement a table which in case of JOIN is 
> capable of loading the individual IDs matching the API of our service, rather 
> than performing a full table scan.
> 
> Do you have any advises/feedback for us?
> 
> Thanks in advance.

Reply via email to