This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 423cdcb99c3f66be435ad2e70d6a15f10a69e252 Author: Sergey Nuyanzin <[email protected]> AuthorDate: Mon Feb 13 23:07:44 2023 +0100 [FLINK-27998][table] Upgrade Calcite from 1.29.0 to 1.30.0 This closes apache/flink#21934 --- flink-table/flink-sql-parser/pom.xml | 4 +- .../flink/sql/parser/FlinkDDLDataTypeTest.java | 8 +- .../flink/sql/parser/FlinkSqlParserImplTest.java | 7 +- .../flink/sql/parser/FlinkSqlUnParserTest.java | 15 +- flink-table/flink-table-planner/pom.xml | 6 +- .../org/apache/calcite/rel/core/Correlate.java | 296 ----------- .../calcite/rel/logical/LogicalCorrelate.java | 188 ------- .../org/apache/calcite/sql/fun/SqlRowOperator.java | 128 +++++ .../calcite/sql/type/SqlTypeFactoryImpl.java | 554 +++++++++++++++++++++ .../calcite/sql/validate/SqlValidatorImpl.java | 28 +- .../apache/calcite/sql2rel/SqlToRelConverter.java | 127 +++-- .../java/org/apache/calcite/tools/RelBuilder.java | 8 +- .../nodes/exec/serde/RexNodeJsonDeserializer.java | 17 +- .../nodes/exec/serde/RexNodeJsonSerializer.java | 5 +- .../flink/table/planner/typeutils/SymbolUtil.java | 6 + .../src/main/resources/META-INF/NOTICE | 4 +- .../planner/calcite/FlinkLogicalRelFactories.scala | 1 + .../table/planner/calcite/FlinkTypeFactory.scala | 6 +- .../planner/calcite/PreValidateReWriter.scala | 1 + .../plan/schema/TimeIndicatorRelDataType.scala | 4 +- .../plan/nodes/exec/stream/CalcJsonPlanTest.java | 14 + .../stream/jsonplan/SargJsonPlanITCase.java | 46 ++ .../stream/CalcJsonPlanTest_jsonplan/testSarg.out | 157 ++++++ .../src/main/resources/META-INF/NOTICE | 2 +- flink-table/pom.xml | 6 +- 25 files changed, 1063 insertions(+), 575 deletions(-) diff --git a/flink-table/flink-sql-parser/pom.xml b/flink-table/flink-sql-parser/pom.xml index 4412ade6072..4da406fe319 100644 --- a/flink-table/flink-sql-parser/pom.xml +++ b/flink-table/flink-sql-parser/pom.xml @@ -62,9 +62,9 @@ under the License. <version>${calcite.version}</version> <exclusions> <!-- - "mvn dependency:tree" as of Calcite 1.29.0: + "mvn dependency:tree" as of Calcite 1.30.0: - [INFO] +- org.apache.calcite:calcite-core:jar:1.29.0:compile + [INFO] +- org.apache.calcite:calcite-core:jar:1.30.0:compile [INFO] | +- org.apache.calcite.avatica:avatica-core:jar:1.20.0:compile [INFO] | \- org.checkerframework:checker-qual:jar:3.10.0:compile diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java index 1fe0d63ba41..3df21df7abc 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java @@ -547,12 +547,12 @@ class FlinkDDLDataTypeTest { private final SqlParser.Config parserConfig; TestFactory() { - this(DEFAULT_OPTIONS, MockCatalogReaderSimple::new, SqlValidatorUtil::newValidator); + this(DEFAULT_OPTIONS, MockCatalogReaderSimple::create, SqlValidatorUtil::newValidator); } TestFactory( Map<String, Object> options, - SqlTestFactory.MockCatalogReaderFactory catalogReaderFactory, + SqlTestFactory.CatalogReaderFactory catalogReaderFactory, SqlTestFactory.ValidatorFactory validatorFactory) { this.options = options; this.validatorFactory = validatorFactory; @@ -560,7 +560,7 @@ class FlinkDDLDataTypeTest { createOperatorTable((SqlOperatorTable) options.get("operatorTable")); this.typeFactory = createTypeFactory((SqlConformance) options.get("conformance")); Boolean caseSensitive = (Boolean) options.get("caseSensitive"); - this.catalogReader = catalogReaderFactory.create(typeFactory, caseSensitive).init(); + this.catalogReader = catalogReaderFactory.create(typeFactory, caseSensitive); this.parserConfig = createParserConfig(options); } @@ -589,7 +589,7 @@ class FlinkDDLDataTypeTest { catalogReader, typeFactory, SqlValidator.Config.DEFAULT - .withSqlConformance(conformance) + .withConformance(conformance) .withTypeCoercionEnabled(enableTypeCoercion)); } diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index b271805399d..86901433b68 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -24,7 +24,7 @@ import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.parser.SqlParseException; -import org.apache.calcite.sql.parser.SqlParserImplFactory; +import org.apache.calcite.sql.parser.SqlParserFixture; import org.apache.calcite.sql.parser.SqlParserTest; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; @@ -42,9 +42,8 @@ import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT; @Execution(CONCURRENT) class FlinkSqlParserImplTest extends SqlParserTest { - @Override - protected SqlParserImplFactory parserImplFactory() { - return FlinkSqlParserImpl.FACTORY; + public SqlParserFixture fixture() { + return super.fixture().withConfig(c -> c.withParserFactory(FlinkSqlParserImpl.FACTORY)); } @Test diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlUnParserTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlUnParserTest.java index 09142717241..cced9d2e440 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlUnParserTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlUnParserTest.java @@ -18,6 +18,9 @@ package org.apache.flink.sql.parser; +import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl; + +import org.apache.calcite.sql.parser.SqlParserFixture; import org.junit.jupiter.api.parallel.Execution; import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT; @@ -34,13 +37,9 @@ class FlinkSqlUnParserTest extends FlinkSqlParserImplTest { // ~ Methods ---------------------------------------------------------------- - @Override - protected boolean isUnparserTest() { - return true; - } - - @Override - protected Tester getTester() { - return new UnparsingTesterImpl(); + public SqlParserFixture fixture() { + return super.fixture() + .withTester(new UnparsingTesterImpl()) + .withConfig(c -> c.withParserFactory(FlinkSqlParserImpl.FACTORY)); } } diff --git a/flink-table/flink-table-planner/pom.xml b/flink-table/flink-table-planner/pom.xml index 7bf89eef10e..9cb54e44b80 100644 --- a/flink-table/flink-table-planner/pom.xml +++ b/flink-table/flink-table-planner/pom.xml @@ -129,10 +129,10 @@ under the License. <version>${calcite.version}</version> <exclusions> <!-- - "mvn dependency:tree" as of Calcite 1.29.0: + "mvn dependency:tree" as of Calcite 1.30.0: - [INFO] +- org.apache.calcite:calcite-core:jar:1.29.0:compile - [INFO] | +- org.apache.calcite:calcite-linq4j:jar:1.29.0:compile + [INFO] +- org.apache.calcite:calcite-core:jar:1.30.0:compile + [INFO] | +- org.apache.calcite:calcite-linq4j:jar:1.30.0:compile [INFO] | +- com.esri.geometry:esri-geometry-api:jar:2.2.0:compile [INFO] | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.13.4:compile [INFO] | +- org.apache.calcite.avatica:avatica-core:jar:1.20.0:compile diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Correlate.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Correlate.java deleted file mode 100644 index 180010eabb5..00000000000 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Correlate.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.calcite.rel.core; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptCost; -import org.apache.calcite.plan.RelOptPlanner; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.BiRel; -import org.apache.calcite.rel.RelInput; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.RelWriter; -import org.apache.calcite.rel.hint.Hintable; -import org.apache.calcite.rel.hint.RelHint; -import org.apache.calcite.rel.metadata.RelMetadataQuery; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.sql.validate.SqlValidatorUtil; -import org.apache.calcite.util.ImmutableBitSet; -import org.apache.calcite.util.Litmus; -import org.checkerframework.checker.nullness.qual.Nullable; - -import java.util.Collections; -import java.util.List; -import java.util.Set; - -import static java.util.Objects.requireNonNull; - -/** - * A relational operator that performs nested-loop joins. - * - * <p>Temporarily copy from calcite to cherry-pick [CALCITE-5107] and will be removed when upgrade - * the latest calcite. - * - * <p>It behaves like a kind of {@link org.apache.calcite.rel.core.Join}, but works by setting - * variables in its environment and restarting its right-hand input. - * - * <p>Correlate is not a join since: typical rules should not match Correlate. - * - * <p>A Correlate is used to represent a correlated query. One implementation strategy is to - * de-correlate the expression. - * - * <table> - * <caption>Mapping of physical operations to logical ones</caption> - * <tr><th>Physical operation</th><th>Logical operation</th></tr> - * <tr><td>NestedLoops</td><td>Correlate(A, B, regular)</td></tr> - * <tr><td>NestedLoopsOuter</td><td>Correlate(A, B, outer)</td></tr> - * <tr><td>NestedLoopsSemi</td><td>Correlate(A, B, semi)</td></tr> - * <tr><td>NestedLoopsAnti</td><td>Correlate(A, B, anti)</td></tr> - * <tr><td>HashJoin</td><td>EquiJoin(A, B)</td></tr> - * <tr><td>HashJoinOuter</td><td>EquiJoin(A, B, outer)</td></tr> - * <tr><td>HashJoinSemi</td><td>SemiJoin(A, B, semi)</td></tr> - * <tr><td>HashJoinAnti</td><td>SemiJoin(A, B, anti)</td></tr> - * </table> - * - * @see CorrelationId - */ -public abstract class Correlate extends BiRel implements Hintable { - // ~ Instance fields -------------------------------------------------------- - - protected final CorrelationId correlationId; - protected final ImmutableBitSet requiredColumns; - protected final JoinRelType joinType; - protected final ImmutableList<RelHint> hints; - - // ~ Constructors ----------------------------------------------------------- - - /** - * Creates a Correlate. - * - * @param cluster Cluster this relational expression belongs to - * @param hints Hints for this node - * @param left Left input relational expression - * @param right Right input relational expression - * @param correlationId Variable name for the row of left input - * @param requiredColumns Set of columns that are used by correlation - * @param joinType Join type - */ - @SuppressWarnings("method.invocation.invalid") - protected Correlate( - RelOptCluster cluster, - RelTraitSet traitSet, - List<RelHint> hints, - RelNode left, - RelNode right, - CorrelationId correlationId, - ImmutableBitSet requiredColumns, - JoinRelType joinType) { - super(cluster, traitSet, left, right); - assert !joinType.generatesNullsOnLeft() : "Correlate has invalid join type " + joinType; - this.joinType = requireNonNull(joinType, "joinType"); - this.correlationId = requireNonNull(correlationId, "correlationId"); - this.requiredColumns = requireNonNull(requiredColumns, "requiredColumns"); - this.hints = ImmutableList.copyOf(hints); - } - - /** - * Creates a Correlate. - * - * @param cluster Cluster this relational expression belongs to - * @param left Left input relational expression - * @param right Right input relational expression - * @param correlationId Variable name for the row of left input - * @param requiredColumns Set of columns that are used by correlation - * @param joinType Join type - */ - protected Correlate( - RelOptCluster cluster, - RelTraitSet traitSet, - RelNode left, - RelNode right, - CorrelationId correlationId, - ImmutableBitSet requiredColumns, - JoinRelType joinType) { - this( - cluster, - traitSet, - Collections.emptyList(), - left, - right, - correlationId, - requiredColumns, - joinType); - } - - /** - * Creates a Correlate by parsing serialized output. - * - * @param input Input representation - */ - protected Correlate(RelInput input) { - this( - input.getCluster(), - input.getTraitSet(), - input.getInputs().get(0), - input.getInputs().get(1), - new CorrelationId( - requireNonNull((Integer) input.get("correlation"), "correlation")), - input.getBitSet("requiredColumns"), - requireNonNull(input.getEnum("joinType", JoinRelType.class), "joinType")); - } - - // ~ Methods ---------------------------------------------------------------- - - @Override - public boolean isValid(Litmus litmus, RelNode.Context context) { - ImmutableBitSet leftColumns = ImmutableBitSet.range(left.getRowType().getFieldCount()); - return super.isValid(litmus, context) - && litmus.check( - leftColumns.contains(requiredColumns), - "Required columns {} not subset of left columns {}", - requiredColumns, - leftColumns) - && RelOptUtil.notContainsCorrelation(left, correlationId, litmus); - } - - @Override - public Correlate copy(RelTraitSet traitSet, List<RelNode> inputs) { - assert inputs.size() == 2; - return copy( - traitSet, inputs.get(0), inputs.get(1), correlationId, requiredColumns, joinType); - } - - public abstract Correlate copy( - RelTraitSet traitSet, - RelNode left, - RelNode right, - CorrelationId correlationId, - ImmutableBitSet requiredColumns, - JoinRelType joinType); - - public JoinRelType getJoinType() { - return joinType; - } - - @Override - protected RelDataType deriveRowType() { - switch (joinType) { - case LEFT: - case INNER: - return SqlValidatorUtil.deriveJoinRowType( - left.getRowType(), - right.getRowType(), - joinType, - getCluster().getTypeFactory(), - null, - ImmutableList.of()); - case ANTI: - case SEMI: - return left.getRowType(); - default: - throw new IllegalStateException("Unknown join type " + joinType); - } - } - - @Override - public RelWriter explainTerms(RelWriter pw) { - return super.explainTerms(pw) - .item("correlation", correlationId) - .item("joinType", joinType.lowerName) - .item("requiredColumns", requiredColumns); - } - - /** - * Returns the correlating expressions. - * - * @return correlating expressions - */ - public CorrelationId getCorrelationId() { - return correlationId; - } - - @Override - public String getCorrelVariable() { - return correlationId.getName(); - } - - /** - * Returns the required columns in left relation required for the correlation in the right. - * - * @return columns in left relation required for the correlation in the right - */ - public ImmutableBitSet getRequiredColumns() { - return requiredColumns; - } - - @Override - public Set<CorrelationId> getVariablesSet() { - return ImmutableSet.of(correlationId); - } - - @Override - public double estimateRowCount(RelMetadataQuery mq) { - double leftRowCount = mq.getRowCount(left); - switch (joinType) { - case SEMI: - case ANTI: - return leftRowCount; - default: - return leftRowCount * mq.getRowCount(right); - } - } - - @Override - public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { - double rowCount = mq.getRowCount(this); - - final double rightRowCount = right.estimateRowCount(mq); - final double leftRowCount = left.estimateRowCount(mq); - if (Double.isInfinite(leftRowCount) || Double.isInfinite(rightRowCount)) { - return planner.getCostFactory().makeInfiniteCost(); - } - - Double restartCount = mq.getRowCount(getLeft()); - if (restartCount == null) { - return planner.getCostFactory().makeInfiniteCost(); - } - // RelMetadataQuery.getCumulativeCost(getRight()); does not work for - // RelSubset, so we ask planner to cost-estimate right relation - RelOptCost rightCost = planner.getCost(getRight(), mq); - if (rightCost == null) { - return planner.getCostFactory().makeInfiniteCost(); - } - RelOptCost rescanCost = rightCost.multiplyBy(Math.max(1.0, restartCount - 1)); - - return planner.getCostFactory() - .makeCost( - rowCount /* generate results */ + leftRowCount /* scan left results */, - 0, - 0) - .plus(rescanCost); - } - - @Override - public ImmutableList<RelHint> getHints() { - return hints; - } -} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalCorrelate.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalCorrelate.java deleted file mode 100644 index fe56ba5b600..00000000000 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalCorrelate.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.calcite.rel.logical; - -import org.apache.calcite.plan.Convention; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelInput; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.RelShuttle; -import org.apache.calcite.rel.core.Correlate; -import org.apache.calcite.rel.core.CorrelationId; -import org.apache.calcite.rel.core.JoinRelType; -import org.apache.calcite.rel.hint.RelHint; -import org.apache.calcite.util.ImmutableBitSet; - -import java.util.Collections; -import java.util.List; - -import static java.util.Objects.requireNonNull; - -/** - * A relational operator that performs nested-loop joins. - * - * <p>Temporarily copy from calcite to cherry-pick [CALCITE-5107] and will be removed when upgrade - * the latest calcite. - * - * <p>It behaves like a kind of {@link org.apache.calcite.rel.core.Join}, but works by setting - * variables in its environment and restarting its right-hand input. - * - * <p>A LogicalCorrelate is used to represent a correlated query. One implementation strategy is to - * de-correlate the expression. - * - * @see org.apache.calcite.rel.core.CorrelationId - */ -public final class LogicalCorrelate extends Correlate { - // ~ Instance fields -------------------------------------------------------- - - // ~ Constructors ----------------------------------------------------------- - - /** - * Creates a LogicalCorrelate. - * - * @param cluster cluster this relational expression belongs to - * @param hints hints for this node - * @param left left input relational expression - * @param right right input relational expression - * @param correlationId variable name for the row of left input - * @param requiredColumns Required columns - * @param joinType join type - */ - public LogicalCorrelate( - RelOptCluster cluster, - RelTraitSet traitSet, - List<RelHint> hints, - RelNode left, - RelNode right, - CorrelationId correlationId, - ImmutableBitSet requiredColumns, - JoinRelType joinType) { - super(cluster, traitSet, hints, left, right, correlationId, requiredColumns, joinType); - } - - /** - * Creates a LogicalCorrelate. - * - * @param cluster cluster this relational expression belongs to - * @param left left input relational expression - * @param right right input relational expression - * @param correlationId variable name for the row of left input - * @param requiredColumns Required columns - * @param joinType join type - */ - public LogicalCorrelate( - RelOptCluster cluster, - RelTraitSet traitSet, - RelNode left, - RelNode right, - CorrelationId correlationId, - ImmutableBitSet requiredColumns, - JoinRelType joinType) { - this( - cluster, - traitSet, - Collections.emptyList(), - left, - right, - correlationId, - requiredColumns, - joinType); - } - - /** Creates a LogicalCorrelate by parsing serialized output. */ - public LogicalCorrelate(RelInput input) { - this( - input.getCluster(), - input.getTraitSet(), - input.getInputs().get(0), - input.getInputs().get(1), - new CorrelationId( - (Integer) requireNonNull(input.get("correlation"), "correlation")), - input.getBitSet("requiredColumns"), - requireNonNull(input.getEnum("joinType", JoinRelType.class), "joinType")); - } - - /** Creates a LogicalCorrelate. */ - public static LogicalCorrelate create( - RelNode left, - RelNode right, - List<RelHint> hints, - CorrelationId correlationId, - ImmutableBitSet requiredColumns, - JoinRelType joinType) { - final RelOptCluster cluster = left.getCluster(); - final RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE); - return new LogicalCorrelate( - cluster, traitSet, hints, left, right, correlationId, requiredColumns, joinType); - } - - /** Creates a LogicalCorrelate. */ - public static LogicalCorrelate create( - RelNode left, - RelNode right, - CorrelationId correlationId, - ImmutableBitSet requiredColumns, - JoinRelType joinType) { - final RelOptCluster cluster = left.getCluster(); - final RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE); - return new LogicalCorrelate( - cluster, traitSet, left, right, correlationId, requiredColumns, joinType); - } - - // ~ Methods ---------------------------------------------------------------- - - @Override - public LogicalCorrelate copy( - RelTraitSet traitSet, - RelNode left, - RelNode right, - CorrelationId correlationId, - ImmutableBitSet requiredColumns, - JoinRelType joinType) { - assert traitSet.containsIfApplicable(Convention.NONE); - return new LogicalCorrelate( - getCluster(), - traitSet, - hints, - left, - right, - correlationId, - requiredColumns, - joinType); - } - - @Override - public RelNode accept(RelShuttle shuttle) { - return shuttle.visit(this); - } - - @Override - public RelNode withHints(List<RelHint> hintList) { - return new LogicalCorrelate( - getCluster(), - traitSet, - hintList, - left, - right, - correlationId, - requiredColumns, - joinType); - } -} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlRowOperator.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlRowOperator.java new file mode 100644 index 00000000000..d861f2af623 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlRowOperator.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.calcite.sql.fun; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperatorBinding; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlUtil; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.type.InferTypes; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.util.Pair; + +import java.util.AbstractList; +import java.util.Map; + +/** + * Copied to keep null semantics of table api and sql in sync. At the same time SQL standard says + * that the next about `ROW`: + * + * <ul> + * <li>The value of {@code R IS NULL} is: + * <ul> + * <li>If the value of every field of V is the null value, then True. + * <li>Otherwise, False. + * </ul> + * <li>The value of {@code R IS NOT NULL} is: + * <ul> + * <li>If the value of no field of V is the null value, then True. + * <li>Otherwise, False. + * </ul> + * </ul> + * + * <p>Calcite applies that logic since <a + * href="https://issues.apache.org/jira/browse/CALCITE-3627">CALCITE-3627</a> (1.30.0+). + * + * <ul> + * <li>Thus, with Calcite 1.30.0+ + * <ul> + * <li>{@code SELECT ROW(CAST(NULL AS INT), CAST(NULL AS INT)) IS NOT NULL; -- returns + * FALSE} + * <li>{@code SELECT ROW(CAST(NULL AS INT), CAST(NULL AS INT)) IS NULL; -- returns TRUE}. + * </ul> + * <li>With Flink and Calcite before 1.30.0 (current behavior of this class) + * <ul> + * <li>{@code SELECT ROW(CAST(NULL AS INT), CAST(NULL AS INT)) IS NOT NULL; -- returns TRUE} + * <li>{@code SELECT ROW(CAST(NULL AS INT), CAST(NULL AS INT)) IS NULL; -- returns FALSE} + * </ul> + * </ul> + * + * Once Flink applies same logic for both table api and sql, this class should be removed. + * + * <p>Changed lines + * + * <ol> + * <li>Line 92 ~ 112 + * </ol> + */ +public class SqlRowOperator extends SqlSpecialOperator { + // ~ Constructors ----------------------------------------------------------- + + public SqlRowOperator(String name) { + super( + name, + SqlKind.ROW, + MDX_PRECEDENCE, + false, + null, + InferTypes.RETURN_TYPE, + OperandTypes.VARIADIC); + } + + // ~ Methods ---------------------------------------------------------------- + + @Override + public RelDataType inferReturnType(SqlOperatorBinding opBinding) { + // ----- FLINK MODIFICATION BEGIN ----- + // The type of a ROW(e1,e2) expression is a record with the types + // {e1type,e2type}. According to the standard, field names are + // implementation-defined. + return opBinding + .getTypeFactory() + .createStructType( + new AbstractList<Map.Entry<String, RelDataType>>() { + @Override + public Map.Entry<String, RelDataType> get(int index) { + return Pair.of( + SqlUtil.deriveAliasFromOrdinal(index), + opBinding.getOperandType(index)); + } + + @Override + public int size() { + return opBinding.getOperandCount(); + } + }); + // ----- FLINK MODIFICATION END ----- + } + + @Override + public void unparse(SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) { + SqlUtil.unparseFunctionSyntax(this, writer, call, false); + } + + // override SqlOperator + @Override + public boolean requiresDecimalExpansion() { + return false; + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/type/SqlTypeFactoryImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/type/SqlTypeFactoryImpl.java new file mode 100644 index 00000000000..a55c40475e2 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/type/SqlTypeFactoryImpl.java @@ -0,0 +1,554 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.sql.type; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataTypeFamily; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.sql.SqlCollation; +import org.apache.calcite.sql.SqlIntervalQualifier; +import org.apache.calcite.util.Util; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.nio.charset.Charset; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * Default implementation {@link SqlTypeFactoryImpl}, the class was copied over because of + * FLINK-31350. + * + * <p>FLINK modifications are at lines + * + * <ol> + * <li>Should be removed after fix of FLINK-31350: Lines 541 ~ 553. + * </ol> + */ +public class SqlTypeFactoryImpl extends RelDataTypeFactoryImpl { + // ~ Constructors ----------------------------------------------------------- + + public SqlTypeFactoryImpl(RelDataTypeSystem typeSystem) { + super(typeSystem); + } + + // ~ Methods ---------------------------------------------------------------- + + @Override + public RelDataType createSqlType(SqlTypeName typeName) { + if (typeName.allowsPrec()) { + return createSqlType(typeName, typeSystem.getDefaultPrecision(typeName)); + } + assertBasic(typeName); + RelDataType newType = new BasicSqlType(typeSystem, typeName); + return canonize(newType); + } + + @Override + public RelDataType createSqlType(SqlTypeName typeName, int precision) { + final int maxPrecision = typeSystem.getMaxPrecision(typeName); + if (maxPrecision >= 0 && precision > maxPrecision) { + precision = maxPrecision; + } + if (typeName.allowsScale()) { + return createSqlType(typeName, precision, typeName.getDefaultScale()); + } + assertBasic(typeName); + assert (precision >= 0) || (precision == RelDataType.PRECISION_NOT_SPECIFIED); + // Does not check precision when typeName is SqlTypeName#NULL. + RelDataType newType = + precision == RelDataType.PRECISION_NOT_SPECIFIED + ? new BasicSqlType(typeSystem, typeName) + : new BasicSqlType(typeSystem, typeName, precision); + newType = SqlTypeUtil.addCharsetAndCollation(newType, this); + return canonize(newType); + } + + @Override + public RelDataType createSqlType(SqlTypeName typeName, int precision, int scale) { + assertBasic(typeName); + assert (precision >= 0) || (precision == RelDataType.PRECISION_NOT_SPECIFIED); + final int maxPrecision = typeSystem.getMaxPrecision(typeName); + if (maxPrecision >= 0 && precision > maxPrecision) { + precision = maxPrecision; + } + RelDataType newType = new BasicSqlType(typeSystem, typeName, precision, scale); + newType = SqlTypeUtil.addCharsetAndCollation(newType, this); + return canonize(newType); + } + + @Override + public RelDataType createUnknownType() { + // FLINK MODIFICATION BEGIN + return canonize(new UnknownSqlType(this)); + // FLINK MODIFICATION END + } + + @Override + public RelDataType createMultisetType(RelDataType type, long maxCardinality) { + assert maxCardinality == -1; + RelDataType newType = new MultisetSqlType(type, false); + return canonize(newType); + } + + @Override + public RelDataType createArrayType(RelDataType elementType, long maxCardinality) { + assert maxCardinality == -1; + ArraySqlType newType = new ArraySqlType(elementType, false); + return canonize(newType); + } + + @Override + public RelDataType createMapType(RelDataType keyType, RelDataType valueType) { + MapSqlType newType = new MapSqlType(keyType, valueType, false); + return canonize(newType); + } + + @Override + public RelDataType createSqlIntervalType(SqlIntervalQualifier intervalQualifier) { + RelDataType newType = new IntervalSqlType(typeSystem, intervalQualifier, false); + return canonize(newType); + } + + @Override + public RelDataType createTypeWithCharsetAndCollation( + RelDataType type, Charset charset, SqlCollation collation) { + assert SqlTypeUtil.inCharFamily(type) : type; + requireNonNull(charset, "charset"); + requireNonNull(collation, "collation"); + RelDataType newType; + if (type instanceof BasicSqlType) { + BasicSqlType sqlType = (BasicSqlType) type; + newType = sqlType.createWithCharsetAndCollation(charset, collation); + } else if (type instanceof JavaType) { + JavaType javaType = (JavaType) type; + newType = + new JavaType( + javaType.getJavaClass(), javaType.isNullable(), charset, collation); + } else { + throw Util.needToImplement("need to implement " + type); + } + return canonize(newType); + } + + @Override + public @Nullable RelDataType leastRestrictive(List<RelDataType> types) { + assert types != null; + assert types.size() >= 1; + + RelDataType type0 = types.get(0); + if (type0.getSqlTypeName() != null) { + RelDataType resultType = leastRestrictiveSqlType(types); + if (resultType != null) { + return resultType; + } + return leastRestrictiveByCast(types); + } + + return super.leastRestrictive(types); + } + + private @Nullable RelDataType leastRestrictiveByCast(List<RelDataType> types) { + RelDataType resultType = types.get(0); + boolean anyNullable = resultType.isNullable(); + for (int i = 1; i < types.size(); i++) { + RelDataType type = types.get(i); + if (type.getSqlTypeName() == SqlTypeName.NULL) { + anyNullable = true; + continue; + } + + if (type.isNullable()) { + anyNullable = true; + } + + if (SqlTypeUtil.canCastFrom(type, resultType, false)) { + resultType = type; + } else { + if (!SqlTypeUtil.canCastFrom(resultType, type, false)) { + return null; + } + } + } + if (anyNullable) { + return createTypeWithNullability(resultType, true); + } else { + return resultType; + } + } + + @Override + public RelDataType createTypeWithNullability(final RelDataType type, final boolean nullable) { + final RelDataType newType; + if (type instanceof BasicSqlType) { + newType = ((BasicSqlType) type).createWithNullability(nullable); + } else if (type instanceof MapSqlType) { + newType = copyMapType(type, nullable); + } else if (type instanceof ArraySqlType) { + newType = copyArrayType(type, nullable); + } else if (type instanceof MultisetSqlType) { + newType = copyMultisetType(type, nullable); + } else if (type instanceof IntervalSqlType) { + newType = copyIntervalType(type, nullable); + } else if (type instanceof ObjectSqlType) { + newType = copyObjectType(type, nullable); + } else { + return super.createTypeWithNullability(type, nullable); + } + return canonize(newType); + } + + private static void assertBasic(SqlTypeName typeName) { + assert typeName != null; + assert typeName != SqlTypeName.MULTISET : "use createMultisetType() instead"; + assert typeName != SqlTypeName.ARRAY : "use createArrayType() instead"; + assert typeName != SqlTypeName.MAP : "use createMapType() instead"; + assert typeName != SqlTypeName.ROW : "use createStructType() instead"; + assert !SqlTypeName.INTERVAL_TYPES.contains(typeName) + : "use createSqlIntervalType() instead"; + } + + private @Nullable RelDataType leastRestrictiveSqlType(List<RelDataType> types) { + RelDataType resultType = null; + int nullCount = 0; + int nullableCount = 0; + int javaCount = 0; + int anyCount = 0; + + for (RelDataType type : types) { + final SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return null; + } + if (typeName == SqlTypeName.ANY) { + anyCount++; + } + if (type.isNullable()) { + ++nullableCount; + } + if (typeName == SqlTypeName.NULL) { + ++nullCount; + } + if (isJavaType(type)) { + ++javaCount; + } + } + + // if any of the inputs are ANY, the output is ANY + if (anyCount > 0) { + return createTypeWithNullability( + createSqlType(SqlTypeName.ANY), nullCount > 0 || nullableCount > 0); + } + + for (int i = 0; i < types.size(); ++i) { + RelDataType type = types.get(i); + RelDataTypeFamily family = type.getFamily(); + + final SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == SqlTypeName.NULL) { + continue; + } + + // Convert Java types; for instance, JavaType(int) becomes INTEGER. + // Except if all types are either NULL or Java types. + if (isJavaType(type) && javaCount + nullCount < types.size()) { + final RelDataType originalType = type; + type = + typeName.allowsPrecScale(true, true) + ? createSqlType(typeName, type.getPrecision(), type.getScale()) + : typeName.allowsPrecScale(true, false) + ? createSqlType(typeName, type.getPrecision()) + : createSqlType(typeName); + type = createTypeWithNullability(type, originalType.isNullable()); + } + + if (resultType == null) { + resultType = type; + SqlTypeName sqlTypeName = resultType.getSqlTypeName(); + if (sqlTypeName == SqlTypeName.ROW) { + return leastRestrictiveStructuredType(types); + } + if (sqlTypeName == SqlTypeName.ARRAY || sqlTypeName == SqlTypeName.MULTISET) { + return leastRestrictiveArrayMultisetType(types, sqlTypeName); + } + if (sqlTypeName == SqlTypeName.MAP) { + return leastRestrictiveMapType(types, sqlTypeName); + } + } + + RelDataTypeFamily resultFamily = resultType.getFamily(); + SqlTypeName resultTypeName = resultType.getSqlTypeName(); + + if (resultFamily != family) { + return null; + } + if (SqlTypeUtil.inCharOrBinaryFamilies(type)) { + Charset charset1 = type.getCharset(); + Charset charset2 = resultType.getCharset(); + SqlCollation collation1 = type.getCollation(); + SqlCollation collation2 = resultType.getCollation(); + + final int precision = + SqlTypeUtil.maxPrecision(resultType.getPrecision(), type.getPrecision()); + + // If either type is LOB, then result is LOB with no precision. + // Otherwise, if either is variable width, result is variable + // width. Otherwise, result is fixed width. + if (SqlTypeUtil.isLob(resultType)) { + resultType = createSqlType(resultType.getSqlTypeName()); + } else if (SqlTypeUtil.isLob(type)) { + resultType = createSqlType(type.getSqlTypeName()); + } else if (SqlTypeUtil.isBoundedVariableWidth(resultType)) { + resultType = createSqlType(resultType.getSqlTypeName(), precision); + } else { + // this catch-all case covers type variable, and both fixed + + SqlTypeName newTypeName = type.getSqlTypeName(); + + if (typeSystem.shouldConvertRaggedUnionTypesToVarying()) { + if (resultType.getPrecision() != type.getPrecision()) { + if (newTypeName == SqlTypeName.CHAR) { + newTypeName = SqlTypeName.VARCHAR; + } else if (newTypeName == SqlTypeName.BINARY) { + newTypeName = SqlTypeName.VARBINARY; + } + } + } + + resultType = createSqlType(newTypeName, precision); + } + Charset charset = null; + // TODO: refine collation combination rules + SqlCollation collation0 = + collation1 != null && collation2 != null + ? SqlCollation.getCoercibilityDyadicOperator(collation1, collation2) + : null; + SqlCollation collation = null; + if ((charset1 != null) || (charset2 != null)) { + if (charset1 == null) { + charset = charset2; + collation = collation2; + } else if (charset2 == null) { + charset = charset1; + collation = collation1; + } else if (charset1.equals(charset2)) { + charset = charset1; + collation = collation1; + } else if (charset1.contains(charset2)) { + charset = charset1; + collation = collation1; + } else { + charset = charset2; + collation = collation2; + } + } + if (charset != null) { + resultType = + createTypeWithCharsetAndCollation( + resultType, + charset, + collation0 != null + ? collation0 + : requireNonNull(collation, "collation")); + } + } else if (SqlTypeUtil.isExactNumeric(type)) { + if (SqlTypeUtil.isExactNumeric(resultType)) { + // TODO: come up with a cleaner way to support + // interval + datetime = datetime + if (types.size() > (i + 1)) { + RelDataType type1 = types.get(i + 1); + if (SqlTypeUtil.isDatetime(type1)) { + resultType = type1; + return createTypeWithNullability( + resultType, nullCount > 0 || nullableCount > 0); + } + } + if (!type.equals(resultType)) { + if (!typeName.allowsPrec() && !resultTypeName.allowsPrec()) { + // use the bigger primitive + if (type.getPrecision() > resultType.getPrecision()) { + resultType = type; + } + } else { + // Let the result type have precision (p), scale (s) + // and number of whole digits (d) as follows: d = + // max(p1 - s1, p2 - s2) s <= max(s1, s2) p = s + d + + int p1 = resultType.getPrecision(); + int p2 = type.getPrecision(); + int s1 = resultType.getScale(); + int s2 = type.getScale(); + final int maxPrecision = typeSystem.getMaxNumericPrecision(); + final int maxScale = typeSystem.getMaxNumericScale(); + + int dout = Math.max(p1 - s1, p2 - s2); + dout = Math.min(dout, maxPrecision); + + int scale = Math.max(s1, s2); + scale = Math.min(scale, maxPrecision - dout); + scale = Math.min(scale, maxScale); + + int precision = dout + scale; + assert precision <= maxPrecision; + assert precision > 0 + || (resultType.getSqlTypeName() == SqlTypeName.DECIMAL + && precision == 0 + && scale == 0); + + resultType = createSqlType(SqlTypeName.DECIMAL, precision, scale); + } + } + } else if (SqlTypeUtil.isApproximateNumeric(resultType)) { + // already approximate; promote to double just in case + // TODO: only promote when required + if (SqlTypeUtil.isDecimal(type)) { + // Only promote to double for decimal types + resultType = createDoublePrecisionType(); + } + } else { + return null; + } + } else if (SqlTypeUtil.isApproximateNumeric(type)) { + if (SqlTypeUtil.isApproximateNumeric(resultType)) { + if (type.getPrecision() > resultType.getPrecision()) { + resultType = type; + } + } else if (SqlTypeUtil.isExactNumeric(resultType)) { + if (SqlTypeUtil.isDecimal(resultType)) { + resultType = createDoublePrecisionType(); + } else { + resultType = type; + } + } else { + return null; + } + } else if (SqlTypeUtil.isInterval(type)) { + // TODO: come up with a cleaner way to support + // interval + datetime = datetime + if (types.size() > (i + 1)) { + RelDataType type1 = types.get(i + 1); + if (SqlTypeUtil.isDatetime(type1)) { + resultType = type1; + return createTypeWithNullability( + resultType, nullCount > 0 || nullableCount > 0); + } + } + + if (!type.equals(resultType)) { + // TODO jvs 4-June-2005: This shouldn't be necessary; + // move logic into IntervalSqlType.combine + Object type1 = resultType; + resultType = + ((IntervalSqlType) resultType).combine(this, (IntervalSqlType) type); + resultType = + ((IntervalSqlType) resultType).combine(this, (IntervalSqlType) type1); + } + } else if (SqlTypeUtil.isDatetime(type)) { + // TODO: come up with a cleaner way to support + // datetime +/- interval (or integer) = datetime + if (types.size() > (i + 1)) { + RelDataType type1 = types.get(i + 1); + if (SqlTypeUtil.isInterval(type1) || SqlTypeUtil.isIntType(type1)) { + resultType = type; + return createTypeWithNullability( + resultType, nullCount > 0 || nullableCount > 0); + } + } + } else { + // TODO: datetime precision details; for now we let + // leastRestrictiveByCast handle it + return null; + } + } + if (resultType != null && nullableCount > 0) { + resultType = createTypeWithNullability(resultType, true); + } + return resultType; + } + + private RelDataType createDoublePrecisionType() { + return createSqlType(SqlTypeName.DOUBLE); + } + + private RelDataType copyMultisetType(RelDataType type, boolean nullable) { + MultisetSqlType mt = (MultisetSqlType) type; + RelDataType elementType = copyType(mt.getComponentType()); + return new MultisetSqlType(elementType, nullable); + } + + private RelDataType copyIntervalType(RelDataType type, boolean nullable) { + return new IntervalSqlType( + typeSystem, + requireNonNull( + type.getIntervalQualifier(), + () -> "type.getIntervalQualifier() for " + type), + nullable); + } + + private static RelDataType copyObjectType(RelDataType type, boolean nullable) { + return new ObjectSqlType( + type.getSqlTypeName(), + type.getSqlIdentifier(), + nullable, + type.getFieldList(), + type.getComparability()); + } + + private RelDataType copyArrayType(RelDataType type, boolean nullable) { + ArraySqlType at = (ArraySqlType) type; + RelDataType elementType = copyType(at.getComponentType()); + return new ArraySqlType(elementType, nullable); + } + + private RelDataType copyMapType(RelDataType type, boolean nullable) { + MapSqlType mt = (MapSqlType) type; + RelDataType keyType = copyType(mt.getKeyType()); + RelDataType valueType = copyType(mt.getValueType()); + return new MapSqlType(keyType, valueType, nullable); + } + + @Override + protected RelDataType canonize(RelDataType type) { + type = super.canonize(type); + if (!(type instanceof ObjectSqlType)) { + return type; + } + ObjectSqlType objectType = (ObjectSqlType) type; + if (!objectType.isNullable()) { + objectType.setFamily(objectType); + } else { + objectType.setFamily((RelDataTypeFamily) createTypeWithNullability(objectType, false)); + } + return type; + } + + // FLINK MODIFICATION BEGIN + /** The unknown type. Similar to the NULL type, but is only equal to itself. */ + static class UnknownSqlType extends BasicSqlType { + UnknownSqlType(RelDataTypeFactory typeFactory) { + super(typeFactory.getTypeSystem(), SqlTypeName.NULL); + } + + @Override + protected void generateTypeString(StringBuilder sb, boolean withDetail) { + sb.append("UNKNOWN"); + } + } + // FLINK MODIFICATION END +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java index b237f3db638..d05271f03c1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java @@ -305,7 +305,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints { // ~ Methods ---------------------------------------------------------------- public SqlConformance getConformance() { - return config.sqlConformance(); + return config.conformance(); } @Pure @@ -539,7 +539,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints { final SqlIdentifier identifier = (SqlIdentifier) selectItem; if (!identifier.isSimple()) { - if (!validator.config().sqlConformance().allowQualifyingCommonColumn()) { + if (!validator.config().conformance().allowQualifyingCommonColumn()) { validateQualifiedCommonColumn((SqlJoin) from, identifier, scope, validator); } return selectItem; @@ -3420,7 +3420,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints { // a NATURAL keyword? switch (joinType) { case LEFT_SEMI_JOIN: - if (!this.config.sqlConformance().isLiberal()) { + if (!this.config.conformance().isLiberal()) { throw newValidationError( join.getJoinTypeNode(), RESOURCE.dialectDoesNotSupportFeature("LEFT SEMI JOIN")); @@ -3551,7 +3551,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints { } if (select.getFrom() == null) { - if (this.config.sqlConformance().isFromRequired()) { + if (this.config.conformance().isFromRequired()) { throw newValidationError(select, RESOURCE.selectMissingFrom()); } } else { @@ -4282,7 +4282,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints { return; } final AggregatingScope havingScope = (AggregatingScope) getSelectScope(select); - if (config.sqlConformance().isHavingAlias()) { + if (config.conformance().isHavingAlias()) { SqlNode newExpr = expandGroupByOrHavingExpr(having, havingScope, select, true); if (having != newExpr) { having = newExpr; @@ -4732,7 +4732,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints { protected RelDataType getLogicalTargetRowType(RelDataType targetRowType, SqlInsert insert) { if (insert.getTargetColumnList() == null - && this.config.sqlConformance().isInsertSubsetColumnsAllowed()) { + && this.config.conformance().isInsertSubsetColumnsAllowed()) { // Target an implicit subset of columns. final SqlNode source = insert.getSource(); final RelDataType sourceRowType = getNamespaceOrThrow(source).getRowType(); @@ -5034,7 +5034,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints { } SqlCall rowConstructor = (SqlCall) operand; - if (this.config.sqlConformance().isInsertSubsetColumnsAllowed() + if (this.config.conformance().isInsertSubsetColumnsAllowed() && targetRowType.isStruct() && rowConstructor.operandCount() < targetRowType.getFieldCount()) { targetRowType = @@ -5655,7 +5655,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints { // What columns from the input are not referenced by a column in the IN // list? - final SqlValidatorNamespace inputNs = Objects.requireNonNull(getNamespace(unpivot.query)); + final SqlValidatorNamespace inputNs = requireNonNull(getNamespace(unpivot.query)); final Set<String> unusedColumnNames = catalogReader.nameMatcher().createSet(); unusedColumnNames.addAll(inputNs.getRowType().getFieldNames()); unusedColumnNames.removeAll(unpivot.usedColumnNames()); @@ -5873,7 +5873,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints { if ((call.operandCount() == 0) && (operator.getSyntax() == SqlSyntax.FUNCTION_ID) && !call.isExpanded() - && !this.config.sqlConformance().allowNiladicParentheses()) { + && !this.config.conformance().allowNiladicParentheses()) { // For example, "LOCALTIME()" is illegal. (It should be // "LOCALTIME", which would have been handled as a // SqlIdentifier.) @@ -6416,7 +6416,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints { // Ordinal markers, e.g. 'select a, b from t order by 2'. // Only recognize them if they are the whole expression, // and if the dialect permits. - if (literal == root && config.sqlConformance().isSortByOrdinal()) { + if (literal == root && config.conformance().isSortByOrdinal()) { switch (literal.getTypeName()) { case DECIMAL: case DOUBLE: @@ -6462,7 +6462,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints { @Override public SqlNode visit(SqlIdentifier id) { // Aliases, e.g. 'select a as x, b from t order by x'. - if (id.isSimple() && config.sqlConformance().isSortByAlias()) { + if (id.isSimple() && config.conformance().isSortByAlias()) { String alias = id.getSimple(); final SqlValidatorNamespace selectNs = getNamespaceOrThrow(select); final RelDataType rowType = selectNs.getRowTypeSansSystemColumns(); @@ -6537,8 +6537,8 @@ public class SqlValidatorImpl implements SqlValidatorWithHints { public @Nullable SqlNode visit(SqlIdentifier id) { if (id.isSimple() && (havingExpr - ? validator.config().sqlConformance().isHavingAlias() - : validator.config().sqlConformance().isGroupByAlias())) { + ? validator.config().conformance().isHavingAlias() + : validator.config().conformance().isGroupByAlias())) { String name = id.getSimple(); SqlNode expr = null; final SqlNameMatcher nameMatcher = validator.catalogReader.nameMatcher(); @@ -6579,7 +6579,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints { @Override public @Nullable SqlNode visit(SqlLiteral literal) { - if (havingExpr || !validator.config().sqlConformance().isGroupByOrdinal()) { + if (havingExpr || !validator.config().conformance().isGroupByOrdinal()) { return super.visit(literal); } boolean isOrdinalLiteral = literal == root; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java index 49aa96ae8f7..2b0844071c3 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java @@ -220,9 +220,9 @@ import static org.apache.calcite.sql.SqlUtil.stripAs; * * <ol> * <li>Added in FLINK-29081, FLINK-28682: Lines 633 ~ 643 - * <li>Added in FLINK-28682: Lines 2095 ~ 2112 - * <li>Added in FLINK-28682: Lines 2149 ~ 2177 - * <li>Added in FLINK-20873: Lines 5198 ~ 5207 + * <li>Added in FLINK-28682: Lines 2130 ~ 2147 + * <li>Added in FLINK-28682: Lines 2184 ~ 2212 + * <li>Added in FLINK-20873: Lines 5245 ~ 5254 * </ol> */ @SuppressWarnings("UnstableApiUsage") @@ -231,7 +231,7 @@ public class SqlToRelConverter { // ~ Static fields/initializers --------------------------------------------- /** Default configuration. */ - private static final Config CONFIG = + public static final Config CONFIG = ImmutableSqlToRelConverter.Config.builder() .withRelBuilderFactory(RelFactories.LOGICAL_BUILDER) .withRelBuilderConfigTransform(c -> c.withPushJoinCondition(true)) @@ -248,7 +248,7 @@ public class SqlToRelConverter { // ~ Instance fields -------------------------------------------------------- - protected final @Nullable SqlValidator validator; + public final @Nullable SqlValidator validator; protected final RexBuilder rexBuilder; protected final Prepare.CatalogReader catalogReader; protected final RelOptCluster cluster; @@ -548,7 +548,7 @@ public class SqlToRelConverter { */ public RelNode trimUnusedFields(boolean ordered, RelNode rootRel) { // Trim fields that are not used by their consumer. - if (isTrimUnusedFields()) { + if (config.isTrimUnusedFields()) { final RelFieldTrimmer trimmer = newFieldTrimmer(); final List<RelCollation> collations = rootRel.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE); @@ -698,6 +698,41 @@ public class SqlToRelConverter { /** Implementation of {@link #convertSelect(SqlSelect, boolean)}; derived class may override. */ protected void convertSelectImpl(final Blackboard bb, SqlSelect select) { convertFrom(bb, select.getFrom()); + + // We would like to remove ORDER BY clause from an expanded view, except if + // it is top-level or affects semantics. + // + // Top-level example. Given the view definition + // CREATE VIEW v AS SELECT * FROM t ORDER BY x + // we would retain the view's ORDER BY in + // SELECT * FROM v + // or + // SELECT * FROM v WHERE y = 5 + // but remove the view's ORDER BY in + // SELECT * FROM v ORDER BY z + // and + // SELECT deptno, COUNT(*) FROM v GROUP BY deptno + // because the ORDER BY and GROUP BY mean that the view is not 'top level' in + // the query. + // + // Semantics example. Given the view definition + // CREATE VIEW v2 AS SELECT * FROM t ORDER BY x LIMIT 10 + // we would never remove the ORDER BY, because "ORDER BY ... LIMIT" is about + // semantics. It is not a 'pure order'. + if (RelOptUtil.isPureOrder(castNonNull(bb.root)) && config.isRemoveSortInSubQuery()) { + // Remove the Sort if the view is at the top level. Also remove the Sort + // if there are other nodes, which will cause the view to be in the + // sub-query. + if (!bb.top + || validator().isAggregate(select) + || select.isDistinct() + || select.hasOrderBy() + || select.getFetch() != null + || select.getOffset() != null) { + bb.setRoot(castNonNull(bb.root).getInput(0), true); + } + } + convertWhere(bb, select.getWhere()); final List<SqlNode> orderExprList = new ArrayList<>(); @@ -2299,7 +2334,7 @@ public class SqlToRelConverter { } RelNode child = (null != bb.root) ? bb.root : LogicalValues.createOneRow(cluster); RelNode uncollect; - if (validator().config().sqlConformance().allowAliasUnnestItems()) { + if (validator().config().conformance().allowAliasUnnestItems()) { uncollect = relBuilder .push(child) @@ -2656,15 +2691,19 @@ public class SqlToRelConverter { SqlValidatorUtil.getExtendedColumns(validator, validatorTable, extendedColumns); table = table.extend(extendedFields); } - final RelNode tableRel; // Review Danny 2020-01-13: hacky to construct a new table scan // in order to apply the hint strategies. final List<RelHint> hints = hintStrategies.apply( SqlUtil.getRelHint(hintStrategies, tableHints), LogicalTableScan.create(cluster, table, ImmutableList.of())); - tableRel = toRel(table, hints); + final RelNode tableRel = toRel(table, hints); bb.setRoot(tableRel, true); + + if (RelOptUtil.isPureOrder(castNonNull(bb.root)) && removeSortInSubQuery(bb.top)) { + bb.setRoot(castNonNull(bb.root).getInput(0), true); + } + if (usedDataset[0]) { bb.setDataset(datasetName); } @@ -2813,7 +2852,8 @@ public class SqlToRelConverter { requiredCols = ImmutableBitSet.fromBitSet(shuttle.varCols).union(p.requiredColumns); } - return LogicalCorrelate.create(leftRel, innerRel, p.id, requiredCols, joinType); + return LogicalCorrelate.create( + leftRel, innerRel, ImmutableList.of(), p.id, requiredCols, joinType); } final RelNode node = @@ -3937,7 +3977,7 @@ public class SqlToRelConverter { final RelDataType tableRowType = targetTable.getRowType(); SqlNodeList targetColumnList = call.getTargetColumnList(); if (targetColumnList == null) { - if (validator().config().sqlConformance().isInsertSubsetColumnsAllowed()) { + if (validator().config().conformance().isInsertSubsetColumnsAllowed()) { final RelDataType targetRowType = typeFactory.createStructType( tableRowType @@ -4163,17 +4203,12 @@ public class SqlToRelConverter { } else { qualified = SqlQualified.create(null, 1, null, identifier); } - final Pair<RexNode, @Nullable Map<String, Integer>> e0 = - requireNonNull( - bb.lookupExp(qualified), () -> "no expression found for " + qualified); + final Pair<RexNode, @Nullable BiFunction<RexNode, String, RexNode>> e0 = + bb.lookupExp(qualified); RexNode e = e0.left; for (String name : qualified.suffix()) { if (e == e0.left && e0.right != null) { - Integer i = - requireNonNull( - e0.right.get(name), - () -> "e0.right.get(name) produced null for " + name); - e = rexBuilder.makeFieldAccess(e, i); + e = e0.right.apply(e, name); } else { final boolean caseSensitive = true; // name already fully-qualified if (identifier.isStar() && bb.scope instanceof MatchRecognizeScope) { @@ -4649,7 +4684,9 @@ public class SqlToRelConverter { mapRootRelToFieldProjection.put(newLeftInput, currentProjection); } - setRoot(newLeftInput, false); + // if the original root rel is a leaf rel, the new root should be a leaf. + // otherwise the field offset will be wrong. + setRoot(newLeftInput, leaves.remove(root()) != null); // right fields appear after the LHS fields. final int rightOffset = @@ -4758,13 +4795,13 @@ public class SqlToRelConverter { } /** - * Returns an expression with which to reference a from-list item. + * Returns an expression with which to reference a from-list item; throws if not found. * - * @param qualified the alias of the from item - * @return a {@link RexFieldAccess} or {@link RexRangeRef}, or null if not found + * @param qualified The alias of the FROM item + * @return a {@link RexFieldAccess} or {@link RexRangeRef}, never null */ - @Nullable - Pair<RexNode, @Nullable Map<String, Integer>> lookupExp(SqlQualified qualified) { + Pair<RexNode, @Nullable BiFunction<RexNode, String, RexNode>> lookupExp( + SqlQualified qualified) { if (nameToNodeMap != null && qualified.prefixLength == 1) { RexNode node = nameToNodeMap.get(qualified.identifier.names.get(0)); if (node == null) { @@ -4779,8 +4816,12 @@ public class SqlToRelConverter { scope().getValidator().getCatalogReader().nameMatcher(); final SqlValidatorScope.ResolvedImpl resolved = new SqlValidatorScope.ResolvedImpl(); scope().resolve(qualified.prefix(), nameMatcher, false, resolved); - if (!(resolved.count() == 1)) { - return null; + if (resolved.count() != 1) { + throw new AssertionError( + "no unique expression found for " + + qualified + + "; count is " + + resolved.count()); } final SqlValidatorScope.Resolve resolve = resolved.only(); final RelDataType rowType = resolve.rowType(); @@ -4793,18 +4834,16 @@ public class SqlToRelConverter { if ((inputs != null) && !isParent) { final LookupContext rels = new LookupContext(this, inputs, systemFieldList.size()); final RexNode node = lookup(resolve.path.steps().get(0).i, rels); - if (node == null) { - return null; - } else { - final Map<String, Integer> fieldOffsets = new HashMap<>(); - for (RelDataTypeField f : resolve.rowType().getFieldList()) { - if (!fieldOffsets.containsKey(f.getName())) { - fieldOffsets.put(f.getName(), f.getIndex()); - } - } - final Map<String, Integer> map = ImmutableMap.copyOf(fieldOffsets); - return Pair.of(node, map); - } + assert node != null; + return Pair.of( + node, + (e, fieldName) -> { + final RelDataTypeField field = + requireNonNull( + rowType.getField(fieldName, true, false), + () -> "field " + fieldName); + return rexBuilder.makeFieldAccess(e, field.getIndex()); + }); } else { // We're referencing a relational expression which has not been // converted yet. This occurs when from items are correlated, @@ -4833,7 +4872,15 @@ public class SqlToRelConverter { offset += c.getRowType().getFieldCount(); } final RexNode c = rexBuilder.makeCorrel(builder.uniquify().build(), correlId); - return Pair.of(c, fields.build()); + final ImmutableMap<String, Integer> fieldMap = fields.build(); + return Pair.of( + c, + (e, fieldName) -> { + final int j = + requireNonNull( + fieldMap.get(fieldName), "field " + fieldName); + return rexBuilder.makeFieldAccess(e, j); + }); } } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/tools/RelBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/tools/RelBuilder.java index 98c8d7ce3ca..00e5075e3c5 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/tools/RelBuilder.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/tools/RelBuilder.java @@ -154,7 +154,7 @@ import static org.apache.calcite.util.Static.RESOURCE; * <p>FLINK modifications are at lines * * <ol> - * <li>Should be removed after fix of FLINK-29804: Lines 2927 ~ 2930 + * <li>Should be removed after fix of FLINK-29804: Lines 2928 ~ 2931 * </ol> */ @Value.Enclosing @@ -2879,7 +2879,8 @@ public class RelBuilder { tableSpool(Spool.Type.LAZY, Spool.Type.LAZY, finder.relOptTable).build(); RelNode seed = tableSpool(Spool.Type.LAZY, Spool.Type.LAZY, finder.relOptTable).build(); RelNode repeatUnion = - struct.repeatUnionFactory.createRepeatUnion(seed, iterative, all, iterationLimit); + struct.repeatUnionFactory.createRepeatUnion( + seed, iterative, all, iterationLimit, finder.relOptTable); return push(repeatUnion); } @@ -2962,7 +2963,7 @@ public class RelBuilder { final ImmutableBitSet requiredColumns = RelOptUtil.correlationColumns(id, right.rel); join = struct.correlateFactory.createCorrelate( - left.rel, right.rel, id, requiredColumns, joinType); + left.rel, right.rel, ImmutableList.of(), id, requiredColumns, joinType); } else { RelNode join0 = struct.joinFactory.createJoin( @@ -3020,6 +3021,7 @@ public class RelBuilder { struct.correlateFactory.createCorrelate( left.rel, right.rel, + ImmutableList.of(), correlationId, ImmutableBitSet.of(requiredOrdinals), joinType); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java index cbb7df05680..c5111219283 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java @@ -45,6 +45,7 @@ import org.apache.calcite.rel.core.CorrelationId; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUnknownAs; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperator; @@ -90,6 +91,7 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSe import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_INTERNAL_NAME; import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_KIND; import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_NAME; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_NULL_AS; import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_OPERANDS; import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_RANGES; import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SARG; @@ -217,9 +219,18 @@ final class RexNodeJsonDeserializer extends StdDeserializer<RexNode> { builder.add(range); } } - final boolean containsNull = sargNode.required(FIELD_NAME_CONTAINS_NULL).booleanValue(); - return rexBuilder.makeSearchArgumentLiteral( - Sarg.of(containsNull, builder.build()), relDataType); + // TODO: Since 1.18.0 nothing is serialized to FIELD_NAME_CONTAINS_NULL. + // This if condition (should be removed in future) is required for backward compatibility + // with Flink 1.17.x where FIELD_NAME_CONTAINS_NULL is still used. + final JsonNode containsNull = sargNode.get(FIELD_NAME_CONTAINS_NULL); + if (containsNull != null) { + return rexBuilder.makeSearchArgumentLiteral( + Sarg.of(containsNull.booleanValue(), builder.build()), relDataType); + } + final RexUnknownAs nullAs = + serializableToCalcite( + RexUnknownAs.class, sargNode.required(FIELD_NAME_NULL_AS).asText()); + return rexBuilder.makeSearchArgumentLiteral(Sarg.of(nullAs, builder.build()), relDataType); } private static @Nullable Object deserializeLiteralValue( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java index 5e788723cd3..f0b6bf80fae 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java @@ -93,7 +93,9 @@ final class RexNodeJsonSerializer extends StdSerializer<RexNode> { static final String FIELD_NAME_BOUND_LOWER = "lower"; static final String FIELD_NAME_BOUND_UPPER = "upper"; static final String FIELD_NAME_BOUND_TYPE = "boundType"; + static final String FIELD_NAME_CONTAINS_NULL = "containsNull"; + static final String FIELD_NAME_NULL_AS = "nullAs"; // Symbol fields static final String FIELD_NAME_SYMBOL = "symbol"; @@ -312,7 +314,8 @@ final class RexNodeJsonSerializer extends StdSerializer<RexNode> { gen.writeEndObject(); } gen.writeEndArray(); - gen.writeBooleanField(FIELD_NAME_CONTAINS_NULL, value.containsNull); + final SerializableSymbol symbol = calciteToSerializable(value.nullAs); + gen.writeStringField(FIELD_NAME_NULL_AS, symbol.getValue()); gen.writeEndObject(); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/SymbolUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/SymbolUtil.java index 8724ea45cfa..1d9ab77b635 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/SymbolUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/SymbolUtil.java @@ -33,6 +33,7 @@ import org.apache.flink.table.utils.DateTimeUtils; import com.google.common.collect.BoundType; import org.apache.calcite.avatica.util.TimeUnit; import org.apache.calcite.avatica.util.TimeUnitRange; +import org.apache.calcite.rex.RexUnknownAs; import org.apache.calcite.sql.SqlJsonConstructorNullClause; import org.apache.calcite.sql.SqlJsonEmptyOrError; import org.apache.calcite.sql.SqlJsonExistsErrorBehavior; @@ -473,6 +474,11 @@ public final class SymbolUtil { // BOUND addSymbolMapping(null, null, BoundType.OPEN, "BOUND", "OPEN"); addSymbolMapping(null, null, BoundType.CLOSED, "BOUND", "CLOSED"); + + // UNKNOWN_AS + addSymbolMapping(null, null, RexUnknownAs.TRUE, "UNKNOWN_AS", "TRUE"); + addSymbolMapping(null, null, RexUnknownAs.FALSE, "UNKNOWN_AS", "FALSE"); + addSymbolMapping(null, null, RexUnknownAs.UNKNOWN, "UNKNOWN_AS", "UNKNOWN"); } /** diff --git a/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE b/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE index eddbcab7247..bdeec088414 100644 --- a/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE +++ b/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE @@ -9,8 +9,8 @@ This project bundles the following dependencies under the Apache Software Licens - com.google.guava:guava:29.0-jre - com.google.guava:failureaccess:1.0.1 - com.esri.geometry:esri-geometry-api:2.2.0 -- org.apache.calcite:calcite-core:1.29.0 -- org.apache.calcite:calcite-linq4j:1.29.0 +- org.apache.calcite:calcite-core:1.30.0 +- org.apache.calcite:calcite-linq4j:1.30.0 - org.apache.calcite.avatica:avatica-core:1.20.0 - commons-codec:commons-codec:1.15 - commons-io:commons-io:2.11.0 diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala index a7e7ff3ea2a..652a177bb98 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala @@ -174,6 +174,7 @@ object FlinkLogicalRelFactories { def createCorrelate( left: RelNode, right: RelNode, + hints: util.List[RelHint], correlationId: CorrelationId, requiredColumns: ImmutableBitSet, joinType: JoinRelType): RelNode = { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala index e7cfb468a60..82fd2a9a682 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala @@ -422,7 +422,11 @@ class FlinkTypeFactory( new GenericRelDataType(generic.genericType, isNullable, typeSystem) case it: TimeIndicatorRelDataType => - new TimeIndicatorRelDataType(it.typeSystem, it.originalType, isNullable, it.isEventTime) + new TimeIndicatorRelDataType( + it.typeSystemField, + it.originalType, + isNullable, + it.isEventTime) // for nested rows we keep the nullability property, // top-level rows fall back to Calcite's default handling diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala index 47dd5734402..1ba6643dc29 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala @@ -23,6 +23,7 @@ import org.apache.flink.sql.parser.dml.RichSqlInsert import org.apache.flink.sql.parser.dql.SqlRichExplain import org.apache.flink.table.api.ValidationException import org.apache.flink.table.planner.calcite.PreValidateReWriter.{appendPartitionAndNullsProjects, notSupported} +import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable import org.apache.flink.table.planner.plan.schema.{CatalogSourceTable, FlinkPreparingTableBase, LegacyCatalogSourceTable} import org.apache.flink.util.Preconditions.checkArgument diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TimeIndicatorRelDataType.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TimeIndicatorRelDataType.scala index 3789f24ab9b..a4076db6adf 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TimeIndicatorRelDataType.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TimeIndicatorRelDataType.scala @@ -28,11 +28,11 @@ import java.lang * basic SQL type. */ class TimeIndicatorRelDataType( - val typeSystem: RelDataTypeSystem, + val typeSystemField: RelDataTypeSystem, val originalType: BasicSqlType, val nullable: Boolean, val isEventTime: Boolean) - extends BasicSqlType(typeSystem, originalType.getSqlTypeName, originalType.getPrecision) { + extends BasicSqlType(typeSystemField, originalType.getSqlTypeName, originalType.getPrecision) { this.isNullable = nullable computeDigest() diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcJsonPlanTest.java index 23652b1fcad..b763d92cdec 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcJsonPlanTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcJsonPlanTest.java @@ -114,4 +114,18 @@ public class CalcJsonPlanTest extends TableTestBase { + "from MyTable where " + "(udf1(a) > 0 or (a * b) < 100) and b > 10"); } + + @Test + public void testSarg() { + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " a bigint\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'sink-insert-only' = 'false',\n" + + " 'table-sink-class' = 'DEFAULT')"; + tEnv.executeSql(sinkTableDdl); + String sql = "insert into MySink SELECT a from MyTable where a = 1 or a = 2 or a is null"; + util.verifyJsonPlan(sql); + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SargJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SargJsonPlanITCase.java new file mode 100644 index 00000000000..1320fa24920 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SargJsonPlanITCase.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.jsonplan; + +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.planner.utils.JsonPlanTestBase; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; + +/** Test for Sarg JsonPlan ser/de. */ +public class SargJsonPlanITCase extends JsonPlanTestBase { + @Test + public void testSarg() throws ExecutionException, InterruptedException { + List<Row> data = + Arrays.asList(Row.of(1), Row.of(2), Row.of((Integer) null), Row.of(4), Row.of(5)); + createTestValuesSourceTable("MyTable", data, "a int"); + createTestNonInsertOnlyValuesSinkTable("`result`", "a int"); + String sql = + "insert into `result` SELECT a\n" + + "FROM MyTable WHERE a = 1 OR a = 2 OR a IS NULL"; + compileSqlAndExecutePlan(sql).await(); + List<String> expected = Arrays.asList("+I[1]", "+I[2]", "+I[null]"); + assertResult(expected, TestValuesTableFactory.getResults("result")); + } +} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcJsonPlanTest_jsonplan/testSarg.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcJsonPlanTest_jsonplan/testSarg.out new file mode 100644 index 00000000000..f39a0d6b95d --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcJsonPlanTest_jsonplan/testSarg.out @@ -0,0 +1,157 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`MyTable`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "BIGINT" + }, { + "name" : "b", + "dataType" : "INT NOT NULL" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "d", + "dataType" : "TIMESTAMP(3)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ], + "options" : { + "connector" : "values", + "bounded" : "false" + } + } + }, + "abilities" : [ { + "type" : "FilterPushDown", + "predicates" : [ ] + }, { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ] ], + "producedType" : "ROW<`a` BIGINT> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`a` BIGINT> NOT NULL" + } ] + }, + "outputType" : "ROW<`a` BIGINT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a], metadata=[]]], fields=[a])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "BIGINT" + } ], + "condition" : { + "kind" : "CALL", + "syntax" : "INTERNAL", + "internalName" : "$SEARCH$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "BIGINT" + }, { + "kind" : "LITERAL", + "sarg" : { + "ranges" : [ { + "lower" : { + "value" : 1, + "boundType" : "CLOSED" + }, + "upper" : { + "value" : 1, + "boundType" : "CLOSED" + } + }, { + "lower" : { + "value" : 2, + "boundType" : "CLOSED" + }, + "upper" : { + "value" : 2, + "boundType" : "CLOSED" + } + } ], + "nullAs" : "TRUE" + }, + "type" : "BIGINT NOT NULL" + } ], + "type" : "BOOLEAN NOT NULL" + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT>", + "description" : "Calc(select=[a], where=[SEARCH(a, Sarg[1, 2; NULL AS TRUE])])" + }, { + "id" : 3, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`MySink`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ], + "options" : { + "sink-insert-only" : "false", + "table-sink-class" : "DEFAULT", + "connector" : "values" + } + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE b/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE index 06e50d09bb2..c3ed918655d 100644 --- a/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE +++ b/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE @@ -6,6 +6,6 @@ The Apache Software Foundation (http://www.apache.org/). This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) -- com.jayway.jsonpath:json-path:2.6.0 +- com.jayway.jsonpath:json-path:2.7.0 - org.codehaus.janino:janino:3.1.9 - org.codehaus.janino:commons-compiler:3.1.9 diff --git a/flink-table/pom.xml b/flink-table/pom.xml index ab5e343fdfb..0efbed1fad7 100644 --- a/flink-table/pom.xml +++ b/flink-table/pom.xml @@ -77,12 +77,12 @@ under the License. </dependencyManagement> <properties> - <calcite.version>1.29.0</calcite.version> - <!-- Calcite 1.29.0 depends on 3.1.6, + <calcite.version>1.30.0</calcite.version> + <!-- Calcite 1.30.0 depends on 3.1.6, at the same time minimum 3.1.x Janino version passing Flink tests is 3.1.9, more details are in FLINK-27995 --> <janino.version>3.1.9</janino.version> - <jsonpath.version>2.6.0</jsonpath.version> + <jsonpath.version>2.7.0</jsonpath.version> <guava.version>29.0-jre</guava.version> </properties> </project>
