twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions URL: https://github.com/apache/flink/pull/8062#discussion_r275313133
########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AlgebraicOperationFactory.java ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.plan.logical.Intersect; +import org.apache.flink.table.plan.logical.LogicalNode; +import org.apache.flink.table.plan.logical.Minus; +import org.apache.flink.table.plan.logical.Union; + +import java.util.stream.IntStream; + +import static org.apache.flink.table.operations.AlgebraicOperationFactory.AlgebraicTableOperationType.UNION; + +/** + * Utility class for creating a valid algebraic operation such as {@link Minus}, {@link Intersect} or {@link Union}. + */ +@Internal +public class AlgebraicOperationFactory { + + private final boolean isStreaming; + + /** + * Describes what kind of operation should be created. + */ + public enum AlgebraicTableOperationType { + INTERSECT, + MINUS, + UNION + } + + public AlgebraicOperationFactory(boolean isStreaming) { + this.isStreaming = isStreaming; + } + + /** + * Creates a valid algebraic operation. + * + * @param type type of operation to create + * @param left first relational operation of the operation + * @param right second relational operation of the operation + * @param all flag defining how duplicates should be handled + * @return creates a valid algebraic operation + */ + public TableOperation create( + AlgebraicTableOperationType type, + TableOperation left, + TableOperation right, + boolean all) { + LogicalNode leftNode = (LogicalNode) left; + LogicalNode rightNode = (LogicalNode) right; + failIfStreaming(type, all); + validateAlgebraicOperation(type, leftNode, right); + switch (type) { + case INTERSECT: + return new Intersect(leftNode, rightNode, all); + case MINUS: + return new Minus(leftNode, rightNode, all); + case UNION: + return new Union(leftNode, rightNode, all); + } + throw new TableException("Unknown algebraic operation type: " + type); + } + + private void validateAlgebraicOperation( + AlgebraicTableOperationType operationType, + TableOperation left, + TableOperation right) { + TableSchema leftSchema = left.getTableSchema(); + int leftFieldCount = leftSchema.getFieldCount(); + TableSchema rightSchema = right.getTableSchema(); + int rightFieldCount = rightSchema.getFieldCount(); + + if (leftFieldCount != rightFieldCount) { + throw new ValidationException(String.format("%s two table of different column sizes:" + + " %d and %d", operationType, leftFieldCount, rightFieldCount)); + } + + TypeInformation<?>[] leftFieldTypes = leftSchema.getFieldTypes(); + TypeInformation<?>[] rightFieldTypes = rightSchema.getFieldTypes(); + boolean sameSchema = IntStream.range(0, leftFieldCount) + .allMatch(idx -> leftFieldTypes[idx].equals(rightFieldTypes[idx])); + + if (!sameSchema) { + throw new ValidationException(String.format("%s two table of different schema:" + + " %s and" + + " %s", operationType, leftSchema, rightSchema)); + } + } + + private void failIfStreaming(AlgebraicTableOperationType type, boolean all) { + boolean shouldFailInCaseOfStreaming = !all || type != UNION; + + if (isStreaming && shouldFailInCaseOfStreaming) { + throw new ValidationException(String.format("%s on stream tables is currently not supported.", type)); Review comment: "The `type.toLowerCase` operation on unbounded tables is currently not supported." ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
