twalthr commented on a change in pull request #8764: [FLINK-12798][table-planner][table-api-java] Port TableEnvironment to table-api-java module URL: https://github.com/apache/flink/pull/8764#discussion_r296147825
########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java ########## @@ -0,0 +1,857 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.internal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.AggregatedTable; +import org.apache.flink.table.api.FlatAggregateTable; +import org.apache.flink.table.api.GroupWindow; +import org.apache.flink.table.api.GroupWindowedTable; +import org.apache.flink.table.api.GroupedTable; +import org.apache.flink.table.api.OverWindow; +import org.apache.flink.table.api.OverWindowedTable; +import org.apache.flink.table.api.QueryConfig; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.WindowGroupedTable; +import org.apache.flink.table.catalog.FunctionLookup; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionParser; +import org.apache.flink.table.expressions.LookupCallResolver; +import org.apache.flink.table.functions.TemporalTableFunction; +import org.apache.flink.table.functions.TemporalTableFunctionImpl; +import org.apache.flink.table.operations.JoinQueryOperation.JoinType; +import org.apache.flink.table.operations.OperationExpressionsUtils; +import org.apache.flink.table.operations.OperationExpressionsUtils.CategorizedExpressions; +import org.apache.flink.table.operations.OperationTreeBuilder; +import org.apache.flink.table.operations.QueryOperation; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +/** + * Implementation for {@link Table}. + */ +@Internal +public class TableImpl implements Table { + + private static final AtomicInteger uniqueId = new AtomicInteger(0); + + private final TableEnvironment tableEnvironment; + private final QueryOperation operationTree; + private final OperationTreeBuilder operationTreeBuilder; + private final LookupCallResolver callResolver; + + private String tableName = null; + + public TableEnvironment getTableEnvironment() { + return tableEnvironment; + } + + private TableImpl( + TableEnvironment tableEnvironment, + QueryOperation operationTree, + OperationTreeBuilder operationTreeBuilder, + LookupCallResolver callResolver) { + this.tableEnvironment = tableEnvironment; + this.operationTree = operationTree; + this.operationTreeBuilder = operationTreeBuilder; + this.callResolver = callResolver; + } + + public static TableImpl createTable( + TableEnvironment tableEnvironment, + QueryOperation operationTree, + OperationTreeBuilder operationTreeBuilder, + FunctionLookup functionCatalog) { + return new TableImpl( + tableEnvironment, + operationTree, + operationTreeBuilder, + new LookupCallResolver(functionCatalog)); + } + + @Override + public TableSchema getSchema() { + return operationTree.getTableSchema(); + } + + @Override + public void printSchema() { + System.out.println(getSchema()); + } + + @Override + public QueryOperation getQueryOperation() { + return operationTree; + } + + @Override + public Table select(String fields) { + return select(ExpressionParser.parseExpressionList(fields).toArray(new Expression[0])); + } + + @Override + public Table select(Expression... fields) { + List<Expression> expressionsWithResolvedCalls = Arrays.stream(fields) + .map(f -> f.accept(callResolver)) + .collect(Collectors.toList()); + CategorizedExpressions extracted = OperationExpressionsUtils.extractAggregationsAndProperties( + expressionsWithResolvedCalls + ); + + if (!extracted.getWindowProperties().isEmpty()) { + throw new ValidationException("Window properties can only be used on windowed tables."); + } + + if (!extracted.getAggregations().isEmpty()) { + QueryOperation aggregate = operationTreeBuilder.aggregate( + Collections.emptyList(), + extracted.getAggregations(), + operationTree + ); + return createTable(operationTreeBuilder.project(extracted.getProjections(), aggregate, false)); + } else { + return createTable(operationTreeBuilder.project(expressionsWithResolvedCalls, operationTree, false)); + } + } + + @Override + public TemporalTableFunction createTemporalTableFunction(String timeAttribute, String primaryKey) { + return createTemporalTableFunction( + ExpressionParser.parseExpression(timeAttribute), + ExpressionParser.parseExpression(primaryKey)); + } + + @Override + public TemporalTableFunction createTemporalTableFunction(Expression timeAttribute, Expression primaryKey) { + Expression resolvedTimeAttribute = operationTreeBuilder.resolveExpression(timeAttribute, operationTree); + Expression resolvedPrimaryKey = operationTreeBuilder.resolveExpression(primaryKey, operationTree); + + return TemporalTableFunctionImpl.create( + operationTree, + resolvedTimeAttribute, + resolvedPrimaryKey); + } + + @Override + public Table as(String fields) { + List<Expression> fieldsExprs = ExpressionParser.parseExpressionList(fields); + return createTable(operationTreeBuilder.alias(fieldsExprs, operationTree)); + } + + @Override + public Table as(Expression... fields) { + return createTable(operationTreeBuilder.alias(Arrays.asList(fields), operationTree)); + } + + @Override + public Table filter(String predicate) { + return filter(ExpressionParser.parseExpression(predicate)); + } + + @Override + public Table filter(Expression predicate) { + Expression resolvedCallPredicate = predicate.accept(callResolver); + return createTable(operationTreeBuilder.filter(resolvedCallPredicate, operationTree)); + } + + @Override + public Table where(String predicate) { + return filter(predicate); + } + + @Override + public Table where(Expression predicate) { + return filter(predicate); + } + + @Override + public GroupedTable groupBy(String fields) { + return new GroupedTableImpl(this, ExpressionParser.parseExpressionList(fields)); + } + + @Override + public GroupedTable groupBy(Expression... fields) { + return new GroupedTableImpl(this, Arrays.asList(fields)); + } + + @Override + public Table distinct() { + return createTable(operationTreeBuilder.distinct(operationTree)); + } + + @Override + public Table join(Table right) { + return joinInternal(right, Optional.empty(), JoinType.INNER); + } + + @Override + public Table join(Table right, String joinPredicate) { + return join(right, ExpressionParser.parseExpression(joinPredicate)); + } + + @Override + public Table join(Table right, Expression joinPredicate) { + return joinInternal(right, Optional.of(joinPredicate), JoinType.INNER); + } + + @Override + public Table leftOuterJoin(Table right) { + return joinInternal(right, Optional.empty(), JoinType.LEFT_OUTER); + } + + @Override + public Table leftOuterJoin(Table right, String joinPredicate) { + return leftOuterJoin(right, ExpressionParser.parseExpression(joinPredicate)); + } + + @Override + public Table leftOuterJoin(Table right, Expression joinPredicate) { + return joinInternal(right, Optional.of(joinPredicate), JoinType.LEFT_OUTER); + } + + @Override + public Table rightOuterJoin(Table right, String joinPredicate) { + return rightOuterJoin(right, ExpressionParser.parseExpression(joinPredicate)); + } + + @Override + public Table rightOuterJoin(Table right, Expression joinPredicate) { + return joinInternal(right, Optional.of(joinPredicate), JoinType.RIGHT_OUTER); + } + + @Override + public Table fullOuterJoin(Table right, String joinPredicate) { + return fullOuterJoin(right, ExpressionParser.parseExpression(joinPredicate)); + } + + @Override + public Table fullOuterJoin(Table right, Expression joinPredicate) { + return joinInternal(right, Optional.of(joinPredicate), JoinType.FULL_OUTER); + } + + private TableImpl joinInternal( + Table right, + Optional<Expression> joinPredicate, + JoinType joinType) { + verifyTableCompatible(right); + + return createTable(operationTreeBuilder.join( + this.operationTree, + right.getQueryOperation(), + joinType, + joinPredicate, + false)); + } + + private void verifyTableCompatible(Table right) { + // check that the TableEnvironment of right table is not null + // and right table belongs to the same TableEnvironment + if (((TableImpl) right).getTableEnvironment() != this.tableEnvironment) { + throw new ValidationException("Only tables from the same TableEnvironment can be joined."); + } + } + + @Override + public Table joinLateral(String tableFunctionCall) { + return joinLateral(ExpressionParser.parseExpression(tableFunctionCall)); + } + + @Override + public Table joinLateral(Expression tableFunctionCall) { + return joinLateralInternal(tableFunctionCall, Optional.empty(), JoinType.INNER); + } + + @Override + public Table joinLateral(String tableFunctionCall, String joinPredicate) { + return joinLateral( + ExpressionParser.parseExpression(tableFunctionCall), + ExpressionParser.parseExpression(joinPredicate)); + } + + @Override + public Table joinLateral(Expression tableFunctionCall, Expression joinPredicate) { + return joinLateralInternal(tableFunctionCall, Optional.of(joinPredicate), JoinType.INNER); + } + + @Override + public Table leftOuterJoinLateral(String tableFunctionCall) { + return leftOuterJoinLateral(ExpressionParser.parseExpression(tableFunctionCall)); + } + + @Override + public Table leftOuterJoinLateral(Expression tableFunctionCall) { + return joinLateralInternal(tableFunctionCall, Optional.empty(), JoinType.LEFT_OUTER); + } + + @Override + public Table leftOuterJoinLateral(String tableFunctionCall, String joinPredicate) { + return leftOuterJoinLateral( + ExpressionParser.parseExpression(tableFunctionCall), + ExpressionParser.parseExpression(joinPredicate)); + } + + @Override + public Table leftOuterJoinLateral(Expression tableFunctionCall, Expression joinPredicate) { + return joinLateralInternal(tableFunctionCall, Optional.of(joinPredicate), JoinType.LEFT_OUTER); + } + + private TableImpl joinLateralInternal( + Expression callExpr, + Optional<Expression> joinPredicate, + JoinType joinType) { + + // check join type + if (joinType != JoinType.INNER && joinType != JoinType.LEFT_OUTER) { + throw new ValidationException( + "Table functions are currently only supported for inner and left outer lateral joins."); + } + + return createTable(operationTreeBuilder.joinLateral( + this.operationTree, + callExpr, + joinType, + joinPredicate)); + } + + @Override + public Table minus(Table right) { + verifyTableCompatible(right); + + return createTable(operationTreeBuilder.minus(operationTree, right.getQueryOperation(), false)); + } + + @Override + public Table minusAll(Table right) { + verifyTableCompatible(right); + + return createTable(operationTreeBuilder.minus(operationTree, right.getQueryOperation(), true)); + } + + @Override + public Table union(Table right) { + verifyTableCompatible(right); + + return createTable(operationTreeBuilder.union(operationTree, right.getQueryOperation(), false)); + } + + @Override + public Table unionAll(Table right) { + verifyTableCompatible(right); + + return createTable(operationTreeBuilder.union(operationTree, right.getQueryOperation(), true)); + } + + @Override + public Table intersect(Table right) { + verifyTableCompatible(right); + + return createTable(operationTreeBuilder.intersect(operationTree, right.getQueryOperation(), false)); + } + + @Override + public Table intersectAll(Table right) { + verifyTableCompatible(right); + + return createTable(operationTreeBuilder.intersect(operationTree, right.getQueryOperation(), true)); + } + + @Override + public Table orderBy(String fields) { + return createTable(operationTreeBuilder.sort(ExpressionParser.parseExpressionList(fields), operationTree)); + } + + @Override + public Table orderBy(Expression... fields) { + return createTable(operationTreeBuilder.sort(Arrays.asList(fields), operationTree)); + } + + @Override + public Table offset(int offset) { + return createTable(operationTreeBuilder.limitWithOffset(offset, operationTree)); + } + + @Override + public Table fetch(int fetch) { + if (fetch < 0) { + throw new ValidationException("FETCH count must be equal or larger than 0."); + } + return createTable(operationTreeBuilder.limitWithFetch(fetch, operationTree)); + } + + @Override + public void insertInto(String tablePath, String... tablePathContinued) { + tableEnvironment.insertInto(this, tablePath, tablePathContinued); + } + + @Override + public void insertInto(String tableName, QueryConfig conf) { + insertInto(conf, tableName); + } + + @Override + public void insertInto(QueryConfig conf, String tablePath, String... tablePathContinued) { + tableEnvironment.insertInto(this, conf, tablePath, tablePathContinued); + } + + @Override + public GroupWindowedTable window(GroupWindow groupWindow) { + return new GroupWindowedTableImpl(this, groupWindow); + } + + @Override + public OverWindowedTable window(OverWindow... overWindows) { + + if (overWindows.length != 1) { + throw new TableException("Over-Windows are currently only supported single window."); Review comment: nit: "Currently, only a single over window is supported." ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
