twalthr commented on a change in pull request #8764:  
[FLINK-12798][table-planner][table-api-java] Port TableEnvironment to 
table-api-java module
URL: https://github.com/apache/flink/pull/8764#discussion_r296147825
 
 

 ##########
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
 ##########
 @@ -0,0 +1,857 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.AggregatedTable;
+import org.apache.flink.table.api.FlatAggregateTable;
+import org.apache.flink.table.api.GroupWindow;
+import org.apache.flink.table.api.GroupWindowedTable;
+import org.apache.flink.table.api.GroupedTable;
+import org.apache.flink.table.api.OverWindow;
+import org.apache.flink.table.api.OverWindowedTable;
+import org.apache.flink.table.api.QueryConfig;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.WindowGroupedTable;
+import org.apache.flink.table.catalog.FunctionLookup;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionParser;
+import org.apache.flink.table.expressions.LookupCallResolver;
+import org.apache.flink.table.functions.TemporalTableFunction;
+import org.apache.flink.table.functions.TemporalTableFunctionImpl;
+import org.apache.flink.table.operations.JoinQueryOperation.JoinType;
+import org.apache.flink.table.operations.OperationExpressionsUtils;
+import 
org.apache.flink.table.operations.OperationExpressionsUtils.CategorizedExpressions;
+import org.apache.flink.table.operations.OperationTreeBuilder;
+import org.apache.flink.table.operations.QueryOperation;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation for {@link Table}.
+ */
+@Internal
+public class TableImpl implements Table {
+
+       private static final AtomicInteger uniqueId = new AtomicInteger(0);
+
+       private final TableEnvironment tableEnvironment;
+       private final QueryOperation operationTree;
+       private final OperationTreeBuilder operationTreeBuilder;
+       private final LookupCallResolver callResolver;
+
+       private String tableName = null;
+
+       public TableEnvironment getTableEnvironment() {
+               return tableEnvironment;
+       }
+
+       private TableImpl(
+                       TableEnvironment tableEnvironment,
+                       QueryOperation operationTree,
+                       OperationTreeBuilder operationTreeBuilder,
+                       LookupCallResolver callResolver) {
+               this.tableEnvironment = tableEnvironment;
+               this.operationTree = operationTree;
+               this.operationTreeBuilder = operationTreeBuilder;
+               this.callResolver = callResolver;
+       }
+
+       public static TableImpl createTable(
+                       TableEnvironment tableEnvironment,
+                       QueryOperation operationTree,
+                       OperationTreeBuilder operationTreeBuilder,
+                       FunctionLookup functionCatalog) {
+               return new TableImpl(
+                       tableEnvironment,
+                       operationTree,
+                       operationTreeBuilder,
+                       new LookupCallResolver(functionCatalog));
+       }
+
+       @Override
+       public TableSchema getSchema() {
+               return operationTree.getTableSchema();
+       }
+
+       @Override
+       public void printSchema() {
+               System.out.println(getSchema());
+       }
+
+       @Override
+       public QueryOperation getQueryOperation() {
+               return operationTree;
+       }
+
+       @Override
+       public Table select(String fields) {
+               return 
select(ExpressionParser.parseExpressionList(fields).toArray(new Expression[0]));
+       }
+
+       @Override
+       public Table select(Expression... fields) {
+               List<Expression> expressionsWithResolvedCalls = 
Arrays.stream(fields)
+                       .map(f -> f.accept(callResolver))
+                       .collect(Collectors.toList());
+               CategorizedExpressions extracted = 
OperationExpressionsUtils.extractAggregationsAndProperties(
+                       expressionsWithResolvedCalls
+               );
+
+               if (!extracted.getWindowProperties().isEmpty()) {
+                       throw new ValidationException("Window properties can 
only be used on windowed tables.");
+               }
+
+               if (!extracted.getAggregations().isEmpty()) {
+                       QueryOperation aggregate = 
operationTreeBuilder.aggregate(
+                               Collections.emptyList(),
+                               extracted.getAggregations(),
+                               operationTree
+                       );
+                       return 
createTable(operationTreeBuilder.project(extracted.getProjections(), aggregate, 
false));
+               } else {
+                       return 
createTable(operationTreeBuilder.project(expressionsWithResolvedCalls, 
operationTree, false));
+               }
+       }
+
+       @Override
+       public TemporalTableFunction createTemporalTableFunction(String 
timeAttribute, String primaryKey) {
+               return createTemporalTableFunction(
+                       ExpressionParser.parseExpression(timeAttribute),
+                       ExpressionParser.parseExpression(primaryKey));
+       }
+
+       @Override
+       public TemporalTableFunction createTemporalTableFunction(Expression 
timeAttribute, Expression primaryKey) {
+               Expression resolvedTimeAttribute = 
operationTreeBuilder.resolveExpression(timeAttribute, operationTree);
+               Expression resolvedPrimaryKey = 
operationTreeBuilder.resolveExpression(primaryKey, operationTree);
+
+               return TemporalTableFunctionImpl.create(
+                       operationTree,
+                       resolvedTimeAttribute,
+                       resolvedPrimaryKey);
+       }
+
+       @Override
+       public Table as(String fields) {
+               List<Expression> fieldsExprs = 
ExpressionParser.parseExpressionList(fields);
+               return createTable(operationTreeBuilder.alias(fieldsExprs, 
operationTree));
+       }
+
+       @Override
+       public Table as(Expression... fields) {
+               return 
createTable(operationTreeBuilder.alias(Arrays.asList(fields), operationTree));
+       }
+
+       @Override
+       public Table filter(String predicate) {
+               return filter(ExpressionParser.parseExpression(predicate));
+       }
+
+       @Override
+       public Table filter(Expression predicate) {
+               Expression resolvedCallPredicate = 
predicate.accept(callResolver);
+               return 
createTable(operationTreeBuilder.filter(resolvedCallPredicate, operationTree));
+       }
+
+       @Override
+       public Table where(String predicate) {
+               return filter(predicate);
+       }
+
+       @Override
+       public Table where(Expression predicate) {
+               return filter(predicate);
+       }
+
+       @Override
+       public GroupedTable groupBy(String fields) {
+               return new GroupedTableImpl(this, 
ExpressionParser.parseExpressionList(fields));
+       }
+
+       @Override
+       public GroupedTable groupBy(Expression... fields) {
+               return new GroupedTableImpl(this, Arrays.asList(fields));
+       }
+
+       @Override
+       public Table distinct() {
+               return 
createTable(operationTreeBuilder.distinct(operationTree));
+       }
+
+       @Override
+       public Table join(Table right) {
+               return joinInternal(right, Optional.empty(), JoinType.INNER);
+       }
+
+       @Override
+       public Table join(Table right, String joinPredicate) {
+               return join(right, 
ExpressionParser.parseExpression(joinPredicate));
+       }
+
+       @Override
+       public Table join(Table right, Expression joinPredicate) {
+               return joinInternal(right, Optional.of(joinPredicate), 
JoinType.INNER);
+       }
+
+       @Override
+       public Table leftOuterJoin(Table right) {
+               return joinInternal(right, Optional.empty(), 
JoinType.LEFT_OUTER);
+       }
+
+       @Override
+       public Table leftOuterJoin(Table right, String joinPredicate) {
+               return leftOuterJoin(right, 
ExpressionParser.parseExpression(joinPredicate));
+       }
+
+       @Override
+       public Table leftOuterJoin(Table right, Expression joinPredicate) {
+               return joinInternal(right, Optional.of(joinPredicate), 
JoinType.LEFT_OUTER);
+       }
+
+       @Override
+       public Table rightOuterJoin(Table right, String joinPredicate) {
+               return rightOuterJoin(right, 
ExpressionParser.parseExpression(joinPredicate));
+       }
+
+       @Override
+       public Table rightOuterJoin(Table right, Expression joinPredicate) {
+               return joinInternal(right, Optional.of(joinPredicate), 
JoinType.RIGHT_OUTER);
+       }
+
+       @Override
+       public Table fullOuterJoin(Table right, String joinPredicate) {
+               return fullOuterJoin(right, 
ExpressionParser.parseExpression(joinPredicate));
+       }
+
+       @Override
+       public Table fullOuterJoin(Table right, Expression joinPredicate) {
+               return joinInternal(right, Optional.of(joinPredicate), 
JoinType.FULL_OUTER);
+       }
+
+       private TableImpl joinInternal(
+                       Table right,
+                       Optional<Expression> joinPredicate,
+                       JoinType joinType) {
+               verifyTableCompatible(right);
+
+               return createTable(operationTreeBuilder.join(
+                       this.operationTree,
+                       right.getQueryOperation(),
+                       joinType,
+                       joinPredicate,
+                       false));
+       }
+
+       private void verifyTableCompatible(Table right) {
+               // check that the TableEnvironment of right table is not null
+               // and right table belongs to the same TableEnvironment
+               if (((TableImpl) right).getTableEnvironment() != 
this.tableEnvironment) {
+                       throw new ValidationException("Only tables from the 
same TableEnvironment can be joined.");
+               }
+       }
+
+       @Override
+       public Table joinLateral(String tableFunctionCall) {
+               return 
joinLateral(ExpressionParser.parseExpression(tableFunctionCall));
+       }
+
+       @Override
+       public Table joinLateral(Expression tableFunctionCall) {
+               return joinLateralInternal(tableFunctionCall, Optional.empty(), 
JoinType.INNER);
+       }
+
+       @Override
+       public Table joinLateral(String tableFunctionCall, String 
joinPredicate) {
+               return joinLateral(
+                       ExpressionParser.parseExpression(tableFunctionCall),
+                       ExpressionParser.parseExpression(joinPredicate));
+       }
+
+       @Override
+       public Table joinLateral(Expression tableFunctionCall, Expression 
joinPredicate) {
+               return joinLateralInternal(tableFunctionCall, 
Optional.of(joinPredicate), JoinType.INNER);
+       }
+
+       @Override
+       public Table leftOuterJoinLateral(String tableFunctionCall) {
+               return 
leftOuterJoinLateral(ExpressionParser.parseExpression(tableFunctionCall));
+       }
+
+       @Override
+       public Table leftOuterJoinLateral(Expression tableFunctionCall) {
+               return joinLateralInternal(tableFunctionCall, Optional.empty(), 
JoinType.LEFT_OUTER);
+       }
+
+       @Override
+       public Table leftOuterJoinLateral(String tableFunctionCall, String 
joinPredicate) {
+               return leftOuterJoinLateral(
+                       ExpressionParser.parseExpression(tableFunctionCall),
+                       ExpressionParser.parseExpression(joinPredicate));
+       }
+
+       @Override
+       public Table leftOuterJoinLateral(Expression tableFunctionCall, 
Expression joinPredicate) {
+               return joinLateralInternal(tableFunctionCall, 
Optional.of(joinPredicate), JoinType.LEFT_OUTER);
+       }
+
+       private TableImpl joinLateralInternal(
+                       Expression callExpr,
+                       Optional<Expression> joinPredicate,
+                       JoinType joinType) {
+
+               // check join type
+               if (joinType != JoinType.INNER && joinType != 
JoinType.LEFT_OUTER) {
+                       throw new ValidationException(
+                               "Table functions are currently only supported 
for inner and left outer lateral joins.");
+               }
+
+               return createTable(operationTreeBuilder.joinLateral(
+                       this.operationTree,
+                       callExpr,
+                       joinType,
+                       joinPredicate));
+       }
+
+       @Override
+       public Table minus(Table right) {
+               verifyTableCompatible(right);
+
+               return createTable(operationTreeBuilder.minus(operationTree, 
right.getQueryOperation(), false));
+       }
+
+       @Override
+       public Table minusAll(Table right) {
+               verifyTableCompatible(right);
+
+               return createTable(operationTreeBuilder.minus(operationTree, 
right.getQueryOperation(), true));
+       }
+
+       @Override
+       public Table union(Table right) {
+               verifyTableCompatible(right);
+
+               return createTable(operationTreeBuilder.union(operationTree, 
right.getQueryOperation(), false));
+       }
+
+       @Override
+       public Table unionAll(Table right) {
+               verifyTableCompatible(right);
+
+               return createTable(operationTreeBuilder.union(operationTree, 
right.getQueryOperation(), true));
+       }
+
+       @Override
+       public Table intersect(Table right) {
+               verifyTableCompatible(right);
+
+               return 
createTable(operationTreeBuilder.intersect(operationTree, 
right.getQueryOperation(), false));
+       }
+
+       @Override
+       public Table intersectAll(Table right) {
+               verifyTableCompatible(right);
+
+               return 
createTable(operationTreeBuilder.intersect(operationTree, 
right.getQueryOperation(), true));
+       }
+
+       @Override
+       public Table orderBy(String fields) {
+               return 
createTable(operationTreeBuilder.sort(ExpressionParser.parseExpressionList(fields),
 operationTree));
+       }
+
+       @Override
+       public Table orderBy(Expression... fields) {
+               return 
createTable(operationTreeBuilder.sort(Arrays.asList(fields), operationTree));
+       }
+
+       @Override
+       public Table offset(int offset) {
+               return createTable(operationTreeBuilder.limitWithOffset(offset, 
operationTree));
+       }
+
+       @Override
+       public Table fetch(int fetch) {
+               if (fetch < 0) {
+                       throw new ValidationException("FETCH count must be 
equal or larger than 0.");
+               }
+               return createTable(operationTreeBuilder.limitWithFetch(fetch, 
operationTree));
+       }
+
+       @Override
+       public void insertInto(String tablePath, String... tablePathContinued) {
+               tableEnvironment.insertInto(this, tablePath, 
tablePathContinued);
+       }
+
+       @Override
+       public void insertInto(String tableName, QueryConfig conf) {
+               insertInto(conf, tableName);
+       }
+
+       @Override
+       public void insertInto(QueryConfig conf, String tablePath, String... 
tablePathContinued) {
+               tableEnvironment.insertInto(this, conf, tablePath, 
tablePathContinued);
+       }
+
+       @Override
+       public GroupWindowedTable window(GroupWindow groupWindow) {
+               return new GroupWindowedTableImpl(this, groupWindow);
+       }
+
+       @Override
+       public OverWindowedTable window(OverWindow... overWindows) {
+
+               if (overWindows.length != 1) {
+                       throw new TableException("Over-Windows are currently 
only supported single window.");
 
 Review comment:
   nit: "Currently, only a single over window is supported."

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to