This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 423cdcb99c3f66be435ad2e70d6a15f10a69e252
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Feb 13 23:07:44 2023 +0100

    [FLINK-27998][table] Upgrade Calcite from 1.29.0 to 1.30.0
    
    This closes apache/flink#21934
---
 flink-table/flink-sql-parser/pom.xml               |   4 +-
 .../flink/sql/parser/FlinkDDLDataTypeTest.java     |   8 +-
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |   7 +-
 .../flink/sql/parser/FlinkSqlUnParserTest.java     |  15 +-
 flink-table/flink-table-planner/pom.xml            |   6 +-
 .../org/apache/calcite/rel/core/Correlate.java     | 296 -----------
 .../calcite/rel/logical/LogicalCorrelate.java      | 188 -------
 .../org/apache/calcite/sql/fun/SqlRowOperator.java | 128 +++++
 .../calcite/sql/type/SqlTypeFactoryImpl.java       | 554 +++++++++++++++++++++
 .../calcite/sql/validate/SqlValidatorImpl.java     |  28 +-
 .../apache/calcite/sql2rel/SqlToRelConverter.java  | 127 +++--
 .../java/org/apache/calcite/tools/RelBuilder.java  |   8 +-
 .../nodes/exec/serde/RexNodeJsonDeserializer.java  |  17 +-
 .../nodes/exec/serde/RexNodeJsonSerializer.java    |   5 +-
 .../flink/table/planner/typeutils/SymbolUtil.java  |   6 +
 .../src/main/resources/META-INF/NOTICE             |   4 +-
 .../planner/calcite/FlinkLogicalRelFactories.scala |   1 +
 .../table/planner/calcite/FlinkTypeFactory.scala   |   6 +-
 .../planner/calcite/PreValidateReWriter.scala      |   1 +
 .../plan/schema/TimeIndicatorRelDataType.scala     |   4 +-
 .../plan/nodes/exec/stream/CalcJsonPlanTest.java   |  14 +
 .../stream/jsonplan/SargJsonPlanITCase.java        |  46 ++
 .../stream/CalcJsonPlanTest_jsonplan/testSarg.out  | 157 ++++++
 .../src/main/resources/META-INF/NOTICE             |   2 +-
 flink-table/pom.xml                                |   6 +-
 25 files changed, 1063 insertions(+), 575 deletions(-)

diff --git a/flink-table/flink-sql-parser/pom.xml 
b/flink-table/flink-sql-parser/pom.xml
index 4412ade6072..4da406fe319 100644
--- a/flink-table/flink-sql-parser/pom.xml
+++ b/flink-table/flink-sql-parser/pom.xml
@@ -62,9 +62,9 @@ under the License.
                        <version>${calcite.version}</version>
                        <exclusions>
                                <!--
-                               "mvn dependency:tree" as of Calcite 1.29.0:
+                               "mvn dependency:tree" as of Calcite 1.30.0:
 
-                               [INFO] +- 
org.apache.calcite:calcite-core:jar:1.29.0:compile
+                               [INFO] +- 
org.apache.calcite:calcite-core:jar:1.30.0:compile
                                [INFO] |  +- 
org.apache.calcite.avatica:avatica-core:jar:1.20.0:compile
                                [INFO] |  \- 
org.checkerframework:checker-qual:jar:3.10.0:compile
 
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
index 1fe0d63ba41..3df21df7abc 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
@@ -547,12 +547,12 @@ class FlinkDDLDataTypeTest {
         private final SqlParser.Config parserConfig;
 
         TestFactory() {
-            this(DEFAULT_OPTIONS, MockCatalogReaderSimple::new, 
SqlValidatorUtil::newValidator);
+            this(DEFAULT_OPTIONS, MockCatalogReaderSimple::create, 
SqlValidatorUtil::newValidator);
         }
 
         TestFactory(
                 Map<String, Object> options,
-                SqlTestFactory.MockCatalogReaderFactory catalogReaderFactory,
+                SqlTestFactory.CatalogReaderFactory catalogReaderFactory,
                 SqlTestFactory.ValidatorFactory validatorFactory) {
             this.options = options;
             this.validatorFactory = validatorFactory;
@@ -560,7 +560,7 @@ class FlinkDDLDataTypeTest {
                     createOperatorTable((SqlOperatorTable) 
options.get("operatorTable"));
             this.typeFactory = createTypeFactory((SqlConformance) 
options.get("conformance"));
             Boolean caseSensitive = (Boolean) options.get("caseSensitive");
-            this.catalogReader = catalogReaderFactory.create(typeFactory, 
caseSensitive).init();
+            this.catalogReader = catalogReaderFactory.create(typeFactory, 
caseSensitive);
             this.parserConfig = createParserConfig(options);
         }
 
@@ -589,7 +589,7 @@ class FlinkDDLDataTypeTest {
                     catalogReader,
                     typeFactory,
                     SqlValidator.Config.DEFAULT
-                            .withSqlConformance(conformance)
+                            .withConformance(conformance)
                             .withTypeCoercionEnabled(enableTypeCoercion));
         }
 
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index b271805399d..86901433b68 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
 
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.parser.SqlParserImplFactory;
+import org.apache.calcite.sql.parser.SqlParserFixture;
 import org.apache.calcite.sql.parser.SqlParserTest;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
@@ -42,9 +42,8 @@ import static 
org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
 @Execution(CONCURRENT)
 class FlinkSqlParserImplTest extends SqlParserTest {
 
-    @Override
-    protected SqlParserImplFactory parserImplFactory() {
-        return FlinkSqlParserImpl.FACTORY;
+    public SqlParserFixture fixture() {
+        return super.fixture().withConfig(c -> 
c.withParserFactory(FlinkSqlParserImpl.FACTORY));
     }
 
     @Test
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlUnParserTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlUnParserTest.java
index 09142717241..cced9d2e440 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlUnParserTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlUnParserTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.sql.parser;
 
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
+
+import org.apache.calcite.sql.parser.SqlParserFixture;
 import org.junit.jupiter.api.parallel.Execution;
 
 import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
@@ -34,13 +37,9 @@ class FlinkSqlUnParserTest extends FlinkSqlParserImplTest {
 
     // ~ Methods 
----------------------------------------------------------------
 
-    @Override
-    protected boolean isUnparserTest() {
-        return true;
-    }
-
-    @Override
-    protected Tester getTester() {
-        return new UnparsingTesterImpl();
+    public SqlParserFixture fixture() {
+        return super.fixture()
+                .withTester(new UnparsingTesterImpl())
+                .withConfig(c -> 
c.withParserFactory(FlinkSqlParserImpl.FACTORY));
     }
 }
diff --git a/flink-table/flink-table-planner/pom.xml 
b/flink-table/flink-table-planner/pom.xml
index 7bf89eef10e..9cb54e44b80 100644
--- a/flink-table/flink-table-planner/pom.xml
+++ b/flink-table/flink-table-planner/pom.xml
@@ -129,10 +129,10 @@ under the License.
                        <version>${calcite.version}</version>
                        <exclusions>
                                <!--
-                               "mvn dependency:tree" as of Calcite 1.29.0:
+                               "mvn dependency:tree" as of Calcite 1.30.0:
 
-                               [INFO] +- 
org.apache.calcite:calcite-core:jar:1.29.0:compile
-                               [INFO] |  +- 
org.apache.calcite:calcite-linq4j:jar:1.29.0:compile
+                               [INFO] +- 
org.apache.calcite:calcite-core:jar:1.30.0:compile
+                               [INFO] |  +- 
org.apache.calcite:calcite-linq4j:jar:1.30.0:compile
                                [INFO] |  +- 
com.esri.geometry:esri-geometry-api:jar:2.2.0:compile
                                [INFO] |  +- 
com.fasterxml.jackson.core:jackson-annotations:jar:2.13.4:compile
                                [INFO] |  +- 
org.apache.calcite.avatica:avatica-core:jar:1.20.0:compile
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Correlate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Correlate.java
deleted file mode 100644
index 180010eabb5..00000000000
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Correlate.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.calcite.rel.core;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.BiRel;
-import org.apache.calcite.rel.RelInput;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rel.hint.Hintable;
-import org.apache.calcite.rel.hint.RelHint;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.sql.validate.SqlValidatorUtil;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.Litmus;
-import org.checkerframework.checker.nullness.qual.Nullable;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A relational operator that performs nested-loop joins.
- *
- * <p>Temporarily copy from calcite to cherry-pick [CALCITE-5107] and will be 
removed when upgrade
- * the latest calcite.
- *
- * <p>It behaves like a kind of {@link org.apache.calcite.rel.core.Join}, but 
works by setting
- * variables in its environment and restarting its right-hand input.
- *
- * <p>Correlate is not a join since: typical rules should not match Correlate.
- *
- * <p>A Correlate is used to represent a correlated query. One implementation 
strategy is to
- * de-correlate the expression.
- *
- * <table>
- *   <caption>Mapping of physical operations to logical ones</caption>
- *   <tr><th>Physical operation</th><th>Logical operation</th></tr>
- *   <tr><td>NestedLoops</td><td>Correlate(A, B, regular)</td></tr>
- *   <tr><td>NestedLoopsOuter</td><td>Correlate(A, B, outer)</td></tr>
- *   <tr><td>NestedLoopsSemi</td><td>Correlate(A, B, semi)</td></tr>
- *   <tr><td>NestedLoopsAnti</td><td>Correlate(A, B, anti)</td></tr>
- *   <tr><td>HashJoin</td><td>EquiJoin(A, B)</td></tr>
- *   <tr><td>HashJoinOuter</td><td>EquiJoin(A, B, outer)</td></tr>
- *   <tr><td>HashJoinSemi</td><td>SemiJoin(A, B, semi)</td></tr>
- *   <tr><td>HashJoinAnti</td><td>SemiJoin(A, B, anti)</td></tr>
- * </table>
- *
- * @see CorrelationId
- */
-public abstract class Correlate extends BiRel implements Hintable {
-    // ~ Instance fields 
--------------------------------------------------------
-
-    protected final CorrelationId correlationId;
-    protected final ImmutableBitSet requiredColumns;
-    protected final JoinRelType joinType;
-    protected final ImmutableList<RelHint> hints;
-
-    // ~ Constructors 
-----------------------------------------------------------
-
-    /**
-     * Creates a Correlate.
-     *
-     * @param cluster Cluster this relational expression belongs to
-     * @param hints Hints for this node
-     * @param left Left input relational expression
-     * @param right Right input relational expression
-     * @param correlationId Variable name for the row of left input
-     * @param requiredColumns Set of columns that are used by correlation
-     * @param joinType Join type
-     */
-    @SuppressWarnings("method.invocation.invalid")
-    protected Correlate(
-            RelOptCluster cluster,
-            RelTraitSet traitSet,
-            List<RelHint> hints,
-            RelNode left,
-            RelNode right,
-            CorrelationId correlationId,
-            ImmutableBitSet requiredColumns,
-            JoinRelType joinType) {
-        super(cluster, traitSet, left, right);
-        assert !joinType.generatesNullsOnLeft() : "Correlate has invalid join 
type " + joinType;
-        this.joinType = requireNonNull(joinType, "joinType");
-        this.correlationId = requireNonNull(correlationId, "correlationId");
-        this.requiredColumns = requireNonNull(requiredColumns, 
"requiredColumns");
-        this.hints = ImmutableList.copyOf(hints);
-    }
-
-    /**
-     * Creates a Correlate.
-     *
-     * @param cluster Cluster this relational expression belongs to
-     * @param left Left input relational expression
-     * @param right Right input relational expression
-     * @param correlationId Variable name for the row of left input
-     * @param requiredColumns Set of columns that are used by correlation
-     * @param joinType Join type
-     */
-    protected Correlate(
-            RelOptCluster cluster,
-            RelTraitSet traitSet,
-            RelNode left,
-            RelNode right,
-            CorrelationId correlationId,
-            ImmutableBitSet requiredColumns,
-            JoinRelType joinType) {
-        this(
-                cluster,
-                traitSet,
-                Collections.emptyList(),
-                left,
-                right,
-                correlationId,
-                requiredColumns,
-                joinType);
-    }
-
-    /**
-     * Creates a Correlate by parsing serialized output.
-     *
-     * @param input Input representation
-     */
-    protected Correlate(RelInput input) {
-        this(
-                input.getCluster(),
-                input.getTraitSet(),
-                input.getInputs().get(0),
-                input.getInputs().get(1),
-                new CorrelationId(
-                        requireNonNull((Integer) input.get("correlation"), 
"correlation")),
-                input.getBitSet("requiredColumns"),
-                requireNonNull(input.getEnum("joinType", JoinRelType.class), 
"joinType"));
-    }
-
-    // ~ Methods 
----------------------------------------------------------------
-
-    @Override
-    public boolean isValid(Litmus litmus, RelNode.Context context) {
-        ImmutableBitSet leftColumns = 
ImmutableBitSet.range(left.getRowType().getFieldCount());
-        return super.isValid(litmus, context)
-                && litmus.check(
-                        leftColumns.contains(requiredColumns),
-                        "Required columns {} not subset of left columns {}",
-                        requiredColumns,
-                        leftColumns)
-                && RelOptUtil.notContainsCorrelation(left, correlationId, 
litmus);
-    }
-
-    @Override
-    public Correlate copy(RelTraitSet traitSet, List<RelNode> inputs) {
-        assert inputs.size() == 2;
-        return copy(
-                traitSet, inputs.get(0), inputs.get(1), correlationId, 
requiredColumns, joinType);
-    }
-
-    public abstract Correlate copy(
-            RelTraitSet traitSet,
-            RelNode left,
-            RelNode right,
-            CorrelationId correlationId,
-            ImmutableBitSet requiredColumns,
-            JoinRelType joinType);
-
-    public JoinRelType getJoinType() {
-        return joinType;
-    }
-
-    @Override
-    protected RelDataType deriveRowType() {
-        switch (joinType) {
-            case LEFT:
-            case INNER:
-                return SqlValidatorUtil.deriveJoinRowType(
-                        left.getRowType(),
-                        right.getRowType(),
-                        joinType,
-                        getCluster().getTypeFactory(),
-                        null,
-                        ImmutableList.of());
-            case ANTI:
-            case SEMI:
-                return left.getRowType();
-            default:
-                throw new IllegalStateException("Unknown join type " + 
joinType);
-        }
-    }
-
-    @Override
-    public RelWriter explainTerms(RelWriter pw) {
-        return super.explainTerms(pw)
-                .item("correlation", correlationId)
-                .item("joinType", joinType.lowerName)
-                .item("requiredColumns", requiredColumns);
-    }
-
-    /**
-     * Returns the correlating expressions.
-     *
-     * @return correlating expressions
-     */
-    public CorrelationId getCorrelationId() {
-        return correlationId;
-    }
-
-    @Override
-    public String getCorrelVariable() {
-        return correlationId.getName();
-    }
-
-    /**
-     * Returns the required columns in left relation required for the 
correlation in the right.
-     *
-     * @return columns in left relation required for the correlation in the 
right
-     */
-    public ImmutableBitSet getRequiredColumns() {
-        return requiredColumns;
-    }
-
-    @Override
-    public Set<CorrelationId> getVariablesSet() {
-        return ImmutableSet.of(correlationId);
-    }
-
-    @Override
-    public double estimateRowCount(RelMetadataQuery mq) {
-        double leftRowCount = mq.getRowCount(left);
-        switch (joinType) {
-            case SEMI:
-            case ANTI:
-                return leftRowCount;
-            default:
-                return leftRowCount * mq.getRowCount(right);
-        }
-    }
-
-    @Override
-    public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
-        double rowCount = mq.getRowCount(this);
-
-        final double rightRowCount = right.estimateRowCount(mq);
-        final double leftRowCount = left.estimateRowCount(mq);
-        if (Double.isInfinite(leftRowCount) || 
Double.isInfinite(rightRowCount)) {
-            return planner.getCostFactory().makeInfiniteCost();
-        }
-
-        Double restartCount = mq.getRowCount(getLeft());
-        if (restartCount == null) {
-            return planner.getCostFactory().makeInfiniteCost();
-        }
-        // RelMetadataQuery.getCumulativeCost(getRight()); does not work for
-        // RelSubset, so we ask planner to cost-estimate right relation
-        RelOptCost rightCost = planner.getCost(getRight(), mq);
-        if (rightCost == null) {
-            return planner.getCostFactory().makeInfiniteCost();
-        }
-        RelOptCost rescanCost = rightCost.multiplyBy(Math.max(1.0, 
restartCount - 1));
-
-        return planner.getCostFactory()
-                .makeCost(
-                        rowCount /* generate results */ + leftRowCount /* scan 
left results */,
-                        0,
-                        0)
-                .plus(rescanCost);
-    }
-
-    @Override
-    public ImmutableList<RelHint> getHints() {
-        return hints;
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalCorrelate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalCorrelate.java
deleted file mode 100644
index fe56ba5b600..00000000000
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalCorrelate.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.calcite.rel.logical;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelInput;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelShuttle;
-import org.apache.calcite.rel.core.Correlate;
-import org.apache.calcite.rel.core.CorrelationId;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.hint.RelHint;
-import org.apache.calcite.util.ImmutableBitSet;
-
-import java.util.Collections;
-import java.util.List;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A relational operator that performs nested-loop joins.
- *
- * <p>Temporarily copy from calcite to cherry-pick [CALCITE-5107] and will be 
removed when upgrade
- * the latest calcite.
- *
- * <p>It behaves like a kind of {@link org.apache.calcite.rel.core.Join}, but 
works by setting
- * variables in its environment and restarting its right-hand input.
- *
- * <p>A LogicalCorrelate is used to represent a correlated query. One 
implementation strategy is to
- * de-correlate the expression.
- *
- * @see org.apache.calcite.rel.core.CorrelationId
- */
-public final class LogicalCorrelate extends Correlate {
-    // ~ Instance fields 
--------------------------------------------------------
-
-    // ~ Constructors 
-----------------------------------------------------------
-
-    /**
-     * Creates a LogicalCorrelate.
-     *
-     * @param cluster cluster this relational expression belongs to
-     * @param hints hints for this node
-     * @param left left input relational expression
-     * @param right right input relational expression
-     * @param correlationId variable name for the row of left input
-     * @param requiredColumns Required columns
-     * @param joinType join type
-     */
-    public LogicalCorrelate(
-            RelOptCluster cluster,
-            RelTraitSet traitSet,
-            List<RelHint> hints,
-            RelNode left,
-            RelNode right,
-            CorrelationId correlationId,
-            ImmutableBitSet requiredColumns,
-            JoinRelType joinType) {
-        super(cluster, traitSet, hints, left, right, correlationId, 
requiredColumns, joinType);
-    }
-
-    /**
-     * Creates a LogicalCorrelate.
-     *
-     * @param cluster cluster this relational expression belongs to
-     * @param left left input relational expression
-     * @param right right input relational expression
-     * @param correlationId variable name for the row of left input
-     * @param requiredColumns Required columns
-     * @param joinType join type
-     */
-    public LogicalCorrelate(
-            RelOptCluster cluster,
-            RelTraitSet traitSet,
-            RelNode left,
-            RelNode right,
-            CorrelationId correlationId,
-            ImmutableBitSet requiredColumns,
-            JoinRelType joinType) {
-        this(
-                cluster,
-                traitSet,
-                Collections.emptyList(),
-                left,
-                right,
-                correlationId,
-                requiredColumns,
-                joinType);
-    }
-
-    /** Creates a LogicalCorrelate by parsing serialized output. */
-    public LogicalCorrelate(RelInput input) {
-        this(
-                input.getCluster(),
-                input.getTraitSet(),
-                input.getInputs().get(0),
-                input.getInputs().get(1),
-                new CorrelationId(
-                        (Integer) requireNonNull(input.get("correlation"), 
"correlation")),
-                input.getBitSet("requiredColumns"),
-                requireNonNull(input.getEnum("joinType", JoinRelType.class), 
"joinType"));
-    }
-
-    /** Creates a LogicalCorrelate. */
-    public static LogicalCorrelate create(
-            RelNode left,
-            RelNode right,
-            List<RelHint> hints,
-            CorrelationId correlationId,
-            ImmutableBitSet requiredColumns,
-            JoinRelType joinType) {
-        final RelOptCluster cluster = left.getCluster();
-        final RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE);
-        return new LogicalCorrelate(
-                cluster, traitSet, hints, left, right, correlationId, 
requiredColumns, joinType);
-    }
-
-    /** Creates a LogicalCorrelate. */
-    public static LogicalCorrelate create(
-            RelNode left,
-            RelNode right,
-            CorrelationId correlationId,
-            ImmutableBitSet requiredColumns,
-            JoinRelType joinType) {
-        final RelOptCluster cluster = left.getCluster();
-        final RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE);
-        return new LogicalCorrelate(
-                cluster, traitSet, left, right, correlationId, 
requiredColumns, joinType);
-    }
-
-    // ~ Methods 
----------------------------------------------------------------
-
-    @Override
-    public LogicalCorrelate copy(
-            RelTraitSet traitSet,
-            RelNode left,
-            RelNode right,
-            CorrelationId correlationId,
-            ImmutableBitSet requiredColumns,
-            JoinRelType joinType) {
-        assert traitSet.containsIfApplicable(Convention.NONE);
-        return new LogicalCorrelate(
-                getCluster(),
-                traitSet,
-                hints,
-                left,
-                right,
-                correlationId,
-                requiredColumns,
-                joinType);
-    }
-
-    @Override
-    public RelNode accept(RelShuttle shuttle) {
-        return shuttle.visit(this);
-    }
-
-    @Override
-    public RelNode withHints(List<RelHint> hintList) {
-        return new LogicalCorrelate(
-                getCluster(),
-                traitSet,
-                hintList,
-                left,
-                right,
-                correlationId,
-                requiredColumns,
-                joinType);
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlRowOperator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlRowOperator.java
new file mode 100644
index 00000000000..d861f2af623
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlRowOperator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.sql.fun;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.type.InferTypes;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.util.Pair;
+
+import java.util.AbstractList;
+import java.util.Map;
+
+/**
+ * Copied to keep null semantics of table api and sql in sync. At the same 
time SQL standard says
+ * that the next about `ROW`:
+ *
+ * <ul>
+ *   <li>The value of {@code R IS NULL} is:
+ *       <ul>
+ *         <li>If the value of every field of V is the null value, then True.
+ *         <li>Otherwise, False.
+ *       </ul>
+ *   <li>The value of {@code R IS NOT NULL} is:
+ *       <ul>
+ *         <li>If the value of no field of V is the null value, then True.
+ *         <li>Otherwise, False.
+ *       </ul>
+ * </ul>
+ *
+ * <p>Calcite applies that logic since <a
+ * href="https://issues.apache.org/jira/browse/CALCITE-3627";>CALCITE-3627</a> 
(1.30.0+).
+ *
+ * <ul>
+ *   <li>Thus, with Calcite 1.30.0+
+ *       <ul>
+ *         <li>{@code SELECT ROW(CAST(NULL AS INT), CAST(NULL AS INT)) IS NOT 
NULL; -- returns
+ *             FALSE}
+ *         <li>{@code SELECT ROW(CAST(NULL AS INT), CAST(NULL AS INT)) IS 
NULL; -- returns TRUE}.
+ *       </ul>
+ *   <li>With Flink and Calcite before 1.30.0 (current behavior of this class)
+ *       <ul>
+ *         <li>{@code SELECT ROW(CAST(NULL AS INT), CAST(NULL AS INT)) IS NOT 
NULL; -- returns TRUE}
+ *         <li>{@code SELECT ROW(CAST(NULL AS INT), CAST(NULL AS INT)) IS 
NULL; -- returns FALSE}
+ *       </ul>
+ * </ul>
+ *
+ * Once Flink applies same logic for both table api and sql, this class should 
be removed.
+ *
+ * <p>Changed lines
+ *
+ * <ol>
+ *   <li>Line 92 ~ 112
+ * </ol>
+ */
+public class SqlRowOperator extends SqlSpecialOperator {
+    // ~ Constructors 
-----------------------------------------------------------
+
+    public SqlRowOperator(String name) {
+        super(
+                name,
+                SqlKind.ROW,
+                MDX_PRECEDENCE,
+                false,
+                null,
+                InferTypes.RETURN_TYPE,
+                OperandTypes.VARIADIC);
+    }
+
+    // ~ Methods 
----------------------------------------------------------------
+
+    @Override
+    public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
+        // ----- FLINK MODIFICATION BEGIN -----
+        // The type of a ROW(e1,e2) expression is a record with the types
+        // {e1type,e2type}.  According to the standard, field names are
+        // implementation-defined.
+        return opBinding
+                .getTypeFactory()
+                .createStructType(
+                        new AbstractList<Map.Entry<String, RelDataType>>() {
+                            @Override
+                            public Map.Entry<String, RelDataType> get(int 
index) {
+                                return Pair.of(
+                                        SqlUtil.deriveAliasFromOrdinal(index),
+                                        opBinding.getOperandType(index));
+                            }
+
+                            @Override
+                            public int size() {
+                                return opBinding.getOperandCount();
+                            }
+                        });
+        // ----- FLINK MODIFICATION END -----
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, SqlCall call, int leftPrec, int 
rightPrec) {
+        SqlUtil.unparseFunctionSyntax(this, writer, call, false);
+    }
+
+    // override SqlOperator
+    @Override
+    public boolean requiresDecimalExpansion() {
+        return false;
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/type/SqlTypeFactoryImpl.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/type/SqlTypeFactoryImpl.java
new file mode 100644
index 00000000000..a55c40475e2
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/type/SqlTypeFactoryImpl.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql.type;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataTypeFamily;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.sql.SqlCollation;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.util.Util;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.nio.charset.Charset;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Default implementation {@link SqlTypeFactoryImpl}, the class was copied 
over because of
+ * FLINK-31350.
+ *
+ * <p>FLINK modifications are at lines
+ *
+ * <ol>
+ *   <li>Should be removed after fix of FLINK-31350: Lines 541 ~ 553.
+ * </ol>
+ */
+public class SqlTypeFactoryImpl extends RelDataTypeFactoryImpl {
+    // ~ Constructors 
-----------------------------------------------------------
+
+    public SqlTypeFactoryImpl(RelDataTypeSystem typeSystem) {
+        super(typeSystem);
+    }
+
+    // ~ Methods 
----------------------------------------------------------------
+
+    @Override
+    public RelDataType createSqlType(SqlTypeName typeName) {
+        if (typeName.allowsPrec()) {
+            return createSqlType(typeName, 
typeSystem.getDefaultPrecision(typeName));
+        }
+        assertBasic(typeName);
+        RelDataType newType = new BasicSqlType(typeSystem, typeName);
+        return canonize(newType);
+    }
+
+    @Override
+    public RelDataType createSqlType(SqlTypeName typeName, int precision) {
+        final int maxPrecision = typeSystem.getMaxPrecision(typeName);
+        if (maxPrecision >= 0 && precision > maxPrecision) {
+            precision = maxPrecision;
+        }
+        if (typeName.allowsScale()) {
+            return createSqlType(typeName, precision, 
typeName.getDefaultScale());
+        }
+        assertBasic(typeName);
+        assert (precision >= 0) || (precision == 
RelDataType.PRECISION_NOT_SPECIFIED);
+        // Does not check precision when typeName is SqlTypeName#NULL.
+        RelDataType newType =
+                precision == RelDataType.PRECISION_NOT_SPECIFIED
+                        ? new BasicSqlType(typeSystem, typeName)
+                        : new BasicSqlType(typeSystem, typeName, precision);
+        newType = SqlTypeUtil.addCharsetAndCollation(newType, this);
+        return canonize(newType);
+    }
+
+    @Override
+    public RelDataType createSqlType(SqlTypeName typeName, int precision, int 
scale) {
+        assertBasic(typeName);
+        assert (precision >= 0) || (precision == 
RelDataType.PRECISION_NOT_SPECIFIED);
+        final int maxPrecision = typeSystem.getMaxPrecision(typeName);
+        if (maxPrecision >= 0 && precision > maxPrecision) {
+            precision = maxPrecision;
+        }
+        RelDataType newType = new BasicSqlType(typeSystem, typeName, 
precision, scale);
+        newType = SqlTypeUtil.addCharsetAndCollation(newType, this);
+        return canonize(newType);
+    }
+
+    @Override
+    public RelDataType createUnknownType() {
+        // FLINK MODIFICATION BEGIN
+        return canonize(new UnknownSqlType(this));
+        // FLINK MODIFICATION END
+    }
+
+    @Override
+    public RelDataType createMultisetType(RelDataType type, long 
maxCardinality) {
+        assert maxCardinality == -1;
+        RelDataType newType = new MultisetSqlType(type, false);
+        return canonize(newType);
+    }
+
+    @Override
+    public RelDataType createArrayType(RelDataType elementType, long 
maxCardinality) {
+        assert maxCardinality == -1;
+        ArraySqlType newType = new ArraySqlType(elementType, false);
+        return canonize(newType);
+    }
+
+    @Override
+    public RelDataType createMapType(RelDataType keyType, RelDataType 
valueType) {
+        MapSqlType newType = new MapSqlType(keyType, valueType, false);
+        return canonize(newType);
+    }
+
+    @Override
+    public RelDataType createSqlIntervalType(SqlIntervalQualifier 
intervalQualifier) {
+        RelDataType newType = new IntervalSqlType(typeSystem, 
intervalQualifier, false);
+        return canonize(newType);
+    }
+
+    @Override
+    public RelDataType createTypeWithCharsetAndCollation(
+            RelDataType type, Charset charset, SqlCollation collation) {
+        assert SqlTypeUtil.inCharFamily(type) : type;
+        requireNonNull(charset, "charset");
+        requireNonNull(collation, "collation");
+        RelDataType newType;
+        if (type instanceof BasicSqlType) {
+            BasicSqlType sqlType = (BasicSqlType) type;
+            newType = sqlType.createWithCharsetAndCollation(charset, 
collation);
+        } else if (type instanceof JavaType) {
+            JavaType javaType = (JavaType) type;
+            newType =
+                    new JavaType(
+                            javaType.getJavaClass(), javaType.isNullable(), 
charset, collation);
+        } else {
+            throw Util.needToImplement("need to implement " + type);
+        }
+        return canonize(newType);
+    }
+
+    @Override
+    public @Nullable RelDataType leastRestrictive(List<RelDataType> types) {
+        assert types != null;
+        assert types.size() >= 1;
+
+        RelDataType type0 = types.get(0);
+        if (type0.getSqlTypeName() != null) {
+            RelDataType resultType = leastRestrictiveSqlType(types);
+            if (resultType != null) {
+                return resultType;
+            }
+            return leastRestrictiveByCast(types);
+        }
+
+        return super.leastRestrictive(types);
+    }
+
+    private @Nullable RelDataType leastRestrictiveByCast(List<RelDataType> 
types) {
+        RelDataType resultType = types.get(0);
+        boolean anyNullable = resultType.isNullable();
+        for (int i = 1; i < types.size(); i++) {
+            RelDataType type = types.get(i);
+            if (type.getSqlTypeName() == SqlTypeName.NULL) {
+                anyNullable = true;
+                continue;
+            }
+
+            if (type.isNullable()) {
+                anyNullable = true;
+            }
+
+            if (SqlTypeUtil.canCastFrom(type, resultType, false)) {
+                resultType = type;
+            } else {
+                if (!SqlTypeUtil.canCastFrom(resultType, type, false)) {
+                    return null;
+                }
+            }
+        }
+        if (anyNullable) {
+            return createTypeWithNullability(resultType, true);
+        } else {
+            return resultType;
+        }
+    }
+
+    @Override
+    public RelDataType createTypeWithNullability(final RelDataType type, final 
boolean nullable) {
+        final RelDataType newType;
+        if (type instanceof BasicSqlType) {
+            newType = ((BasicSqlType) type).createWithNullability(nullable);
+        } else if (type instanceof MapSqlType) {
+            newType = copyMapType(type, nullable);
+        } else if (type instanceof ArraySqlType) {
+            newType = copyArrayType(type, nullable);
+        } else if (type instanceof MultisetSqlType) {
+            newType = copyMultisetType(type, nullable);
+        } else if (type instanceof IntervalSqlType) {
+            newType = copyIntervalType(type, nullable);
+        } else if (type instanceof ObjectSqlType) {
+            newType = copyObjectType(type, nullable);
+        } else {
+            return super.createTypeWithNullability(type, nullable);
+        }
+        return canonize(newType);
+    }
+
+    private static void assertBasic(SqlTypeName typeName) {
+        assert typeName != null;
+        assert typeName != SqlTypeName.MULTISET : "use createMultisetType() 
instead";
+        assert typeName != SqlTypeName.ARRAY : "use createArrayType() instead";
+        assert typeName != SqlTypeName.MAP : "use createMapType() instead";
+        assert typeName != SqlTypeName.ROW : "use createStructType() instead";
+        assert !SqlTypeName.INTERVAL_TYPES.contains(typeName)
+                : "use createSqlIntervalType() instead";
+    }
+
+    private @Nullable RelDataType leastRestrictiveSqlType(List<RelDataType> 
types) {
+        RelDataType resultType = null;
+        int nullCount = 0;
+        int nullableCount = 0;
+        int javaCount = 0;
+        int anyCount = 0;
+
+        for (RelDataType type : types) {
+            final SqlTypeName typeName = type.getSqlTypeName();
+            if (typeName == null) {
+                return null;
+            }
+            if (typeName == SqlTypeName.ANY) {
+                anyCount++;
+            }
+            if (type.isNullable()) {
+                ++nullableCount;
+            }
+            if (typeName == SqlTypeName.NULL) {
+                ++nullCount;
+            }
+            if (isJavaType(type)) {
+                ++javaCount;
+            }
+        }
+
+        //  if any of the inputs are ANY, the output is ANY
+        if (anyCount > 0) {
+            return createTypeWithNullability(
+                    createSqlType(SqlTypeName.ANY), nullCount > 0 || 
nullableCount > 0);
+        }
+
+        for (int i = 0; i < types.size(); ++i) {
+            RelDataType type = types.get(i);
+            RelDataTypeFamily family = type.getFamily();
+
+            final SqlTypeName typeName = type.getSqlTypeName();
+            if (typeName == SqlTypeName.NULL) {
+                continue;
+            }
+
+            // Convert Java types; for instance, JavaType(int) becomes INTEGER.
+            // Except if all types are either NULL or Java types.
+            if (isJavaType(type) && javaCount + nullCount < types.size()) {
+                final RelDataType originalType = type;
+                type =
+                        typeName.allowsPrecScale(true, true)
+                                ? createSqlType(typeName, type.getPrecision(), 
type.getScale())
+                                : typeName.allowsPrecScale(true, false)
+                                        ? createSqlType(typeName, 
type.getPrecision())
+                                        : createSqlType(typeName);
+                type = createTypeWithNullability(type, 
originalType.isNullable());
+            }
+
+            if (resultType == null) {
+                resultType = type;
+                SqlTypeName sqlTypeName = resultType.getSqlTypeName();
+                if (sqlTypeName == SqlTypeName.ROW) {
+                    return leastRestrictiveStructuredType(types);
+                }
+                if (sqlTypeName == SqlTypeName.ARRAY || sqlTypeName == 
SqlTypeName.MULTISET) {
+                    return leastRestrictiveArrayMultisetType(types, 
sqlTypeName);
+                }
+                if (sqlTypeName == SqlTypeName.MAP) {
+                    return leastRestrictiveMapType(types, sqlTypeName);
+                }
+            }
+
+            RelDataTypeFamily resultFamily = resultType.getFamily();
+            SqlTypeName resultTypeName = resultType.getSqlTypeName();
+
+            if (resultFamily != family) {
+                return null;
+            }
+            if (SqlTypeUtil.inCharOrBinaryFamilies(type)) {
+                Charset charset1 = type.getCharset();
+                Charset charset2 = resultType.getCharset();
+                SqlCollation collation1 = type.getCollation();
+                SqlCollation collation2 = resultType.getCollation();
+
+                final int precision =
+                        SqlTypeUtil.maxPrecision(resultType.getPrecision(), 
type.getPrecision());
+
+                // If either type is LOB, then result is LOB with no precision.
+                // Otherwise, if either is variable width, result is variable
+                // width.  Otherwise, result is fixed width.
+                if (SqlTypeUtil.isLob(resultType)) {
+                    resultType = createSqlType(resultType.getSqlTypeName());
+                } else if (SqlTypeUtil.isLob(type)) {
+                    resultType = createSqlType(type.getSqlTypeName());
+                } else if (SqlTypeUtil.isBoundedVariableWidth(resultType)) {
+                    resultType = createSqlType(resultType.getSqlTypeName(), 
precision);
+                } else {
+                    // this catch-all case covers type variable, and both fixed
+
+                    SqlTypeName newTypeName = type.getSqlTypeName();
+
+                    if (typeSystem.shouldConvertRaggedUnionTypesToVarying()) {
+                        if (resultType.getPrecision() != type.getPrecision()) {
+                            if (newTypeName == SqlTypeName.CHAR) {
+                                newTypeName = SqlTypeName.VARCHAR;
+                            } else if (newTypeName == SqlTypeName.BINARY) {
+                                newTypeName = SqlTypeName.VARBINARY;
+                            }
+                        }
+                    }
+
+                    resultType = createSqlType(newTypeName, precision);
+                }
+                Charset charset = null;
+                // TODO:  refine collation combination rules
+                SqlCollation collation0 =
+                        collation1 != null && collation2 != null
+                                ? 
SqlCollation.getCoercibilityDyadicOperator(collation1, collation2)
+                                : null;
+                SqlCollation collation = null;
+                if ((charset1 != null) || (charset2 != null)) {
+                    if (charset1 == null) {
+                        charset = charset2;
+                        collation = collation2;
+                    } else if (charset2 == null) {
+                        charset = charset1;
+                        collation = collation1;
+                    } else if (charset1.equals(charset2)) {
+                        charset = charset1;
+                        collation = collation1;
+                    } else if (charset1.contains(charset2)) {
+                        charset = charset1;
+                        collation = collation1;
+                    } else {
+                        charset = charset2;
+                        collation = collation2;
+                    }
+                }
+                if (charset != null) {
+                    resultType =
+                            createTypeWithCharsetAndCollation(
+                                    resultType,
+                                    charset,
+                                    collation0 != null
+                                            ? collation0
+                                            : requireNonNull(collation, 
"collation"));
+                }
+            } else if (SqlTypeUtil.isExactNumeric(type)) {
+                if (SqlTypeUtil.isExactNumeric(resultType)) {
+                    // TODO: come up with a cleaner way to support
+                    // interval + datetime = datetime
+                    if (types.size() > (i + 1)) {
+                        RelDataType type1 = types.get(i + 1);
+                        if (SqlTypeUtil.isDatetime(type1)) {
+                            resultType = type1;
+                            return createTypeWithNullability(
+                                    resultType, nullCount > 0 || nullableCount 
> 0);
+                        }
+                    }
+                    if (!type.equals(resultType)) {
+                        if (!typeName.allowsPrec() && 
!resultTypeName.allowsPrec()) {
+                            // use the bigger primitive
+                            if (type.getPrecision() > 
resultType.getPrecision()) {
+                                resultType = type;
+                            }
+                        } else {
+                            // Let the result type have precision (p), scale 
(s)
+                            // and number of whole digits (d) as follows: d =
+                            // max(p1 - s1, p2 - s2) s <= max(s1, s2) p = s + d
+
+                            int p1 = resultType.getPrecision();
+                            int p2 = type.getPrecision();
+                            int s1 = resultType.getScale();
+                            int s2 = type.getScale();
+                            final int maxPrecision = 
typeSystem.getMaxNumericPrecision();
+                            final int maxScale = 
typeSystem.getMaxNumericScale();
+
+                            int dout = Math.max(p1 - s1, p2 - s2);
+                            dout = Math.min(dout, maxPrecision);
+
+                            int scale = Math.max(s1, s2);
+                            scale = Math.min(scale, maxPrecision - dout);
+                            scale = Math.min(scale, maxScale);
+
+                            int precision = dout + scale;
+                            assert precision <= maxPrecision;
+                            assert precision > 0
+                                    || (resultType.getSqlTypeName() == 
SqlTypeName.DECIMAL
+                                            && precision == 0
+                                            && scale == 0);
+
+                            resultType = createSqlType(SqlTypeName.DECIMAL, 
precision, scale);
+                        }
+                    }
+                } else if (SqlTypeUtil.isApproximateNumeric(resultType)) {
+                    // already approximate; promote to double just in case
+                    // TODO:  only promote when required
+                    if (SqlTypeUtil.isDecimal(type)) {
+                        // Only promote to double for decimal types
+                        resultType = createDoublePrecisionType();
+                    }
+                } else {
+                    return null;
+                }
+            } else if (SqlTypeUtil.isApproximateNumeric(type)) {
+                if (SqlTypeUtil.isApproximateNumeric(resultType)) {
+                    if (type.getPrecision() > resultType.getPrecision()) {
+                        resultType = type;
+                    }
+                } else if (SqlTypeUtil.isExactNumeric(resultType)) {
+                    if (SqlTypeUtil.isDecimal(resultType)) {
+                        resultType = createDoublePrecisionType();
+                    } else {
+                        resultType = type;
+                    }
+                } else {
+                    return null;
+                }
+            } else if (SqlTypeUtil.isInterval(type)) {
+                // TODO: come up with a cleaner way to support
+                // interval + datetime = datetime
+                if (types.size() > (i + 1)) {
+                    RelDataType type1 = types.get(i + 1);
+                    if (SqlTypeUtil.isDatetime(type1)) {
+                        resultType = type1;
+                        return createTypeWithNullability(
+                                resultType, nullCount > 0 || nullableCount > 
0);
+                    }
+                }
+
+                if (!type.equals(resultType)) {
+                    // TODO jvs 4-June-2005:  This shouldn't be necessary;
+                    // move logic into IntervalSqlType.combine
+                    Object type1 = resultType;
+                    resultType =
+                            ((IntervalSqlType) resultType).combine(this, 
(IntervalSqlType) type);
+                    resultType =
+                            ((IntervalSqlType) resultType).combine(this, 
(IntervalSqlType) type1);
+                }
+            } else if (SqlTypeUtil.isDatetime(type)) {
+                // TODO: come up with a cleaner way to support
+                // datetime +/- interval (or integer) = datetime
+                if (types.size() > (i + 1)) {
+                    RelDataType type1 = types.get(i + 1);
+                    if (SqlTypeUtil.isInterval(type1) || 
SqlTypeUtil.isIntType(type1)) {
+                        resultType = type;
+                        return createTypeWithNullability(
+                                resultType, nullCount > 0 || nullableCount > 
0);
+                    }
+                }
+            } else {
+                // TODO:  datetime precision details; for now we let
+                // leastRestrictiveByCast handle it
+                return null;
+            }
+        }
+        if (resultType != null && nullableCount > 0) {
+            resultType = createTypeWithNullability(resultType, true);
+        }
+        return resultType;
+    }
+
+    private RelDataType createDoublePrecisionType() {
+        return createSqlType(SqlTypeName.DOUBLE);
+    }
+
+    private RelDataType copyMultisetType(RelDataType type, boolean nullable) {
+        MultisetSqlType mt = (MultisetSqlType) type;
+        RelDataType elementType = copyType(mt.getComponentType());
+        return new MultisetSqlType(elementType, nullable);
+    }
+
+    private RelDataType copyIntervalType(RelDataType type, boolean nullable) {
+        return new IntervalSqlType(
+                typeSystem,
+                requireNonNull(
+                        type.getIntervalQualifier(),
+                        () -> "type.getIntervalQualifier() for " + type),
+                nullable);
+    }
+
+    private static RelDataType copyObjectType(RelDataType type, boolean 
nullable) {
+        return new ObjectSqlType(
+                type.getSqlTypeName(),
+                type.getSqlIdentifier(),
+                nullable,
+                type.getFieldList(),
+                type.getComparability());
+    }
+
+    private RelDataType copyArrayType(RelDataType type, boolean nullable) {
+        ArraySqlType at = (ArraySqlType) type;
+        RelDataType elementType = copyType(at.getComponentType());
+        return new ArraySqlType(elementType, nullable);
+    }
+
+    private RelDataType copyMapType(RelDataType type, boolean nullable) {
+        MapSqlType mt = (MapSqlType) type;
+        RelDataType keyType = copyType(mt.getKeyType());
+        RelDataType valueType = copyType(mt.getValueType());
+        return new MapSqlType(keyType, valueType, nullable);
+    }
+
+    @Override
+    protected RelDataType canonize(RelDataType type) {
+        type = super.canonize(type);
+        if (!(type instanceof ObjectSqlType)) {
+            return type;
+        }
+        ObjectSqlType objectType = (ObjectSqlType) type;
+        if (!objectType.isNullable()) {
+            objectType.setFamily(objectType);
+        } else {
+            objectType.setFamily((RelDataTypeFamily) 
createTypeWithNullability(objectType, false));
+        }
+        return type;
+    }
+
+    // FLINK MODIFICATION BEGIN
+    /** The unknown type. Similar to the NULL type, but is only equal to 
itself. */
+    static class UnknownSqlType extends BasicSqlType {
+        UnknownSqlType(RelDataTypeFactory typeFactory) {
+            super(typeFactory.getTypeSystem(), SqlTypeName.NULL);
+        }
+
+        @Override
+        protected void generateTypeString(StringBuilder sb, boolean 
withDetail) {
+            sb.append("UNKNOWN");
+        }
+    }
+    // FLINK MODIFICATION END
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index b237f3db638..d05271f03c1 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -305,7 +305,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
     // ~ Methods 
----------------------------------------------------------------
 
     public SqlConformance getConformance() {
-        return config.sqlConformance();
+        return config.conformance();
     }
 
     @Pure
@@ -539,7 +539,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
 
         final SqlIdentifier identifier = (SqlIdentifier) selectItem;
         if (!identifier.isSimple()) {
-            if 
(!validator.config().sqlConformance().allowQualifyingCommonColumn()) {
+            if 
(!validator.config().conformance().allowQualifyingCommonColumn()) {
                 validateQualifiedCommonColumn((SqlJoin) from, identifier, 
scope, validator);
             }
             return selectItem;
@@ -3420,7 +3420,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
         // a NATURAL keyword?
         switch (joinType) {
             case LEFT_SEMI_JOIN:
-                if (!this.config.sqlConformance().isLiberal()) {
+                if (!this.config.conformance().isLiberal()) {
                     throw newValidationError(
                             join.getJoinTypeNode(),
                             RESOURCE.dialectDoesNotSupportFeature("LEFT SEMI 
JOIN"));
@@ -3551,7 +3551,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
         }
 
         if (select.getFrom() == null) {
-            if (this.config.sqlConformance().isFromRequired()) {
+            if (this.config.conformance().isFromRequired()) {
                 throw newValidationError(select, RESOURCE.selectMissingFrom());
             }
         } else {
@@ -4282,7 +4282,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
             return;
         }
         final AggregatingScope havingScope = (AggregatingScope) 
getSelectScope(select);
-        if (config.sqlConformance().isHavingAlias()) {
+        if (config.conformance().isHavingAlias()) {
             SqlNode newExpr = expandGroupByOrHavingExpr(having, havingScope, 
select, true);
             if (having != newExpr) {
                 having = newExpr;
@@ -4732,7 +4732,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
 
     protected RelDataType getLogicalTargetRowType(RelDataType targetRowType, 
SqlInsert insert) {
         if (insert.getTargetColumnList() == null
-                && 
this.config.sqlConformance().isInsertSubsetColumnsAllowed()) {
+                && this.config.conformance().isInsertSubsetColumnsAllowed()) {
             // Target an implicit subset of columns.
             final SqlNode source = insert.getSource();
             final RelDataType sourceRowType = 
getNamespaceOrThrow(source).getRowType();
@@ -5034,7 +5034,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
             }
 
             SqlCall rowConstructor = (SqlCall) operand;
-            if (this.config.sqlConformance().isInsertSubsetColumnsAllowed()
+            if (this.config.conformance().isInsertSubsetColumnsAllowed()
                     && targetRowType.isStruct()
                     && rowConstructor.operandCount() < 
targetRowType.getFieldCount()) {
                 targetRowType =
@@ -5655,7 +5655,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
 
         // What columns from the input are not referenced by a column in the IN
         // list?
-        final SqlValidatorNamespace inputNs = 
Objects.requireNonNull(getNamespace(unpivot.query));
+        final SqlValidatorNamespace inputNs = 
requireNonNull(getNamespace(unpivot.query));
         final Set<String> unusedColumnNames = 
catalogReader.nameMatcher().createSet();
         unusedColumnNames.addAll(inputNs.getRowType().getFieldNames());
         unusedColumnNames.removeAll(unpivot.usedColumnNames());
@@ -5873,7 +5873,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
         if ((call.operandCount() == 0)
                 && (operator.getSyntax() == SqlSyntax.FUNCTION_ID)
                 && !call.isExpanded()
-                && !this.config.sqlConformance().allowNiladicParentheses()) {
+                && !this.config.conformance().allowNiladicParentheses()) {
             // For example, "LOCALTIME()" is illegal. (It should be
             // "LOCALTIME", which would have been handled as a
             // SqlIdentifier.)
@@ -6416,7 +6416,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
             // Ordinal markers, e.g. 'select a, b from t order by 2'.
             // Only recognize them if they are the whole expression,
             // and if the dialect permits.
-            if (literal == root && config.sqlConformance().isSortByOrdinal()) {
+            if (literal == root && config.conformance().isSortByOrdinal()) {
                 switch (literal.getTypeName()) {
                     case DECIMAL:
                     case DOUBLE:
@@ -6462,7 +6462,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
         @Override
         public SqlNode visit(SqlIdentifier id) {
             // Aliases, e.g. 'select a as x, b from t order by x'.
-            if (id.isSimple() && config.sqlConformance().isSortByAlias()) {
+            if (id.isSimple() && config.conformance().isSortByAlias()) {
                 String alias = id.getSimple();
                 final SqlValidatorNamespace selectNs = 
getNamespaceOrThrow(select);
                 final RelDataType rowType = 
selectNs.getRowTypeSansSystemColumns();
@@ -6537,8 +6537,8 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
         public @Nullable SqlNode visit(SqlIdentifier id) {
             if (id.isSimple()
                     && (havingExpr
-                            ? 
validator.config().sqlConformance().isHavingAlias()
-                            : 
validator.config().sqlConformance().isGroupByAlias())) {
+                            ? validator.config().conformance().isHavingAlias()
+                            : 
validator.config().conformance().isGroupByAlias())) {
                 String name = id.getSimple();
                 SqlNode expr = null;
                 final SqlNameMatcher nameMatcher = 
validator.catalogReader.nameMatcher();
@@ -6579,7 +6579,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
 
         @Override
         public @Nullable SqlNode visit(SqlLiteral literal) {
-            if (havingExpr || 
!validator.config().sqlConformance().isGroupByOrdinal()) {
+            if (havingExpr || 
!validator.config().conformance().isGroupByOrdinal()) {
                 return super.visit(literal);
             }
             boolean isOrdinalLiteral = literal == root;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index 49aa96ae8f7..2b0844071c3 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -220,9 +220,9 @@ import static org.apache.calcite.sql.SqlUtil.stripAs;
  *
  * <ol>
  *   <li>Added in FLINK-29081, FLINK-28682: Lines 633 ~ 643
- *   <li>Added in FLINK-28682: Lines 2095 ~ 2112
- *   <li>Added in FLINK-28682: Lines 2149 ~ 2177
- *   <li>Added in FLINK-20873: Lines 5198 ~ 5207
+ *   <li>Added in FLINK-28682: Lines 2130 ~ 2147
+ *   <li>Added in FLINK-28682: Lines 2184 ~ 2212
+ *   <li>Added in FLINK-20873: Lines 5245 ~ 5254
  * </ol>
  */
 @SuppressWarnings("UnstableApiUsage")
@@ -231,7 +231,7 @@ public class SqlToRelConverter {
     // ~ Static fields/initializers 
---------------------------------------------
 
     /** Default configuration. */
-    private static final Config CONFIG =
+    public static final Config CONFIG =
             ImmutableSqlToRelConverter.Config.builder()
                     .withRelBuilderFactory(RelFactories.LOGICAL_BUILDER)
                     .withRelBuilderConfigTransform(c -> 
c.withPushJoinCondition(true))
@@ -248,7 +248,7 @@ public class SqlToRelConverter {
 
     // ~ Instance fields 
--------------------------------------------------------
 
-    protected final @Nullable SqlValidator validator;
+    public final @Nullable SqlValidator validator;
     protected final RexBuilder rexBuilder;
     protected final Prepare.CatalogReader catalogReader;
     protected final RelOptCluster cluster;
@@ -548,7 +548,7 @@ public class SqlToRelConverter {
      */
     public RelNode trimUnusedFields(boolean ordered, RelNode rootRel) {
         // Trim fields that are not used by their consumer.
-        if (isTrimUnusedFields()) {
+        if (config.isTrimUnusedFields()) {
             final RelFieldTrimmer trimmer = newFieldTrimmer();
             final List<RelCollation> collations =
                     
rootRel.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE);
@@ -698,6 +698,41 @@ public class SqlToRelConverter {
     /** Implementation of {@link #convertSelect(SqlSelect, boolean)}; derived 
class may override. */
     protected void convertSelectImpl(final Blackboard bb, SqlSelect select) {
         convertFrom(bb, select.getFrom());
+
+        // We would like to remove ORDER BY clause from an expanded view, 
except if
+        // it is top-level or affects semantics.
+        //
+        // Top-level example. Given the view definition
+        //   CREATE VIEW v AS SELECT * FROM t ORDER BY x
+        // we would retain the view's ORDER BY in
+        //   SELECT * FROM v
+        // or
+        //   SELECT * FROM v WHERE y = 5
+        // but remove the view's ORDER BY in
+        //   SELECT * FROM v ORDER BY z
+        // and
+        //   SELECT deptno, COUNT(*) FROM v GROUP BY deptno
+        // because the ORDER BY and GROUP BY mean that the view is not 'top 
level' in
+        // the query.
+        //
+        // Semantics example. Given the view definition
+        //   CREATE VIEW v2 AS SELECT * FROM t ORDER BY x LIMIT 10
+        // we would never remove the ORDER BY, because "ORDER BY ... LIMIT" is 
about
+        // semantics. It is not a 'pure order'.
+        if (RelOptUtil.isPureOrder(castNonNull(bb.root)) && 
config.isRemoveSortInSubQuery()) {
+            // Remove the Sort if the view is at the top level. Also remove 
the Sort
+            // if there are other nodes, which will cause the view to be in the
+            // sub-query.
+            if (!bb.top
+                    || validator().isAggregate(select)
+                    || select.isDistinct()
+                    || select.hasOrderBy()
+                    || select.getFetch() != null
+                    || select.getOffset() != null) {
+                bb.setRoot(castNonNull(bb.root).getInput(0), true);
+            }
+        }
+
         convertWhere(bb, select.getWhere());
 
         final List<SqlNode> orderExprList = new ArrayList<>();
@@ -2299,7 +2334,7 @@ public class SqlToRelConverter {
         }
         RelNode child = (null != bb.root) ? bb.root : 
LogicalValues.createOneRow(cluster);
         RelNode uncollect;
-        if (validator().config().sqlConformance().allowAliasUnnestItems()) {
+        if (validator().config().conformance().allowAliasUnnestItems()) {
             uncollect =
                     relBuilder
                             .push(child)
@@ -2656,15 +2691,19 @@ public class SqlToRelConverter {
                     SqlValidatorUtil.getExtendedColumns(validator, 
validatorTable, extendedColumns);
             table = table.extend(extendedFields);
         }
-        final RelNode tableRel;
         // Review Danny 2020-01-13: hacky to construct a new table scan
         // in order to apply the hint strategies.
         final List<RelHint> hints =
                 hintStrategies.apply(
                         SqlUtil.getRelHint(hintStrategies, tableHints),
                         LogicalTableScan.create(cluster, table, 
ImmutableList.of()));
-        tableRel = toRel(table, hints);
+        final RelNode tableRel = toRel(table, hints);
         bb.setRoot(tableRel, true);
+
+        if (RelOptUtil.isPureOrder(castNonNull(bb.root)) && 
removeSortInSubQuery(bb.top)) {
+            bb.setRoot(castNonNull(bb.root).getInput(0), true);
+        }
+
         if (usedDataset[0]) {
             bb.setDataset(datasetName);
         }
@@ -2813,7 +2852,8 @@ public class SqlToRelConverter {
                 requiredCols = 
ImmutableBitSet.fromBitSet(shuttle.varCols).union(p.requiredColumns);
             }
 
-            return LogicalCorrelate.create(leftRel, innerRel, p.id, 
requiredCols, joinType);
+            return LogicalCorrelate.create(
+                    leftRel, innerRel, ImmutableList.of(), p.id, requiredCols, 
joinType);
         }
 
         final RelNode node =
@@ -3937,7 +3977,7 @@ public class SqlToRelConverter {
         final RelDataType tableRowType = targetTable.getRowType();
         SqlNodeList targetColumnList = call.getTargetColumnList();
         if (targetColumnList == null) {
-            if 
(validator().config().sqlConformance().isInsertSubsetColumnsAllowed()) {
+            if 
(validator().config().conformance().isInsertSubsetColumnsAllowed()) {
                 final RelDataType targetRowType =
                         typeFactory.createStructType(
                                 tableRowType
@@ -4163,17 +4203,12 @@ public class SqlToRelConverter {
         } else {
             qualified = SqlQualified.create(null, 1, null, identifier);
         }
-        final Pair<RexNode, @Nullable Map<String, Integer>> e0 =
-                requireNonNull(
-                        bb.lookupExp(qualified), () -> "no expression found 
for " + qualified);
+        final Pair<RexNode, @Nullable BiFunction<RexNode, String, RexNode>> e0 
=
+                bb.lookupExp(qualified);
         RexNode e = e0.left;
         for (String name : qualified.suffix()) {
             if (e == e0.left && e0.right != null) {
-                Integer i =
-                        requireNonNull(
-                                e0.right.get(name),
-                                () -> "e0.right.get(name) produced null for " 
+ name);
-                e = rexBuilder.makeFieldAccess(e, i);
+                e = e0.right.apply(e, name);
             } else {
                 final boolean caseSensitive = true; // name already 
fully-qualified
                 if (identifier.isStar() && bb.scope instanceof 
MatchRecognizeScope) {
@@ -4649,7 +4684,9 @@ public class SqlToRelConverter {
                     mapRootRelToFieldProjection.put(newLeftInput, 
currentProjection);
                 }
 
-                setRoot(newLeftInput, false);
+                // if the original root rel is a leaf rel, the new root should 
be a leaf.
+                // otherwise the field offset will be wrong.
+                setRoot(newLeftInput, leaves.remove(root()) != null);
 
                 // right fields appear after the LHS fields.
                 final int rightOffset =
@@ -4758,13 +4795,13 @@ public class SqlToRelConverter {
         }
 
         /**
-         * Returns an expression with which to reference a from-list item.
+         * Returns an expression with which to reference a from-list item; 
throws if not found.
          *
-         * @param qualified the alias of the from item
-         * @return a {@link RexFieldAccess} or {@link RexRangeRef}, or null if 
not found
+         * @param qualified The alias of the FROM item
+         * @return a {@link RexFieldAccess} or {@link RexRangeRef}, never null
          */
-        @Nullable
-        Pair<RexNode, @Nullable Map<String, Integer>> lookupExp(SqlQualified 
qualified) {
+        Pair<RexNode, @Nullable BiFunction<RexNode, String, RexNode>> 
lookupExp(
+                SqlQualified qualified) {
             if (nameToNodeMap != null && qualified.prefixLength == 1) {
                 RexNode node = 
nameToNodeMap.get(qualified.identifier.names.get(0));
                 if (node == null) {
@@ -4779,8 +4816,12 @@ public class SqlToRelConverter {
                     scope().getValidator().getCatalogReader().nameMatcher();
             final SqlValidatorScope.ResolvedImpl resolved = new 
SqlValidatorScope.ResolvedImpl();
             scope().resolve(qualified.prefix(), nameMatcher, false, resolved);
-            if (!(resolved.count() == 1)) {
-                return null;
+            if (resolved.count() != 1) {
+                throw new AssertionError(
+                        "no unique expression found for "
+                                + qualified
+                                + "; count is "
+                                + resolved.count());
             }
             final SqlValidatorScope.Resolve resolve = resolved.only();
             final RelDataType rowType = resolve.rowType();
@@ -4793,18 +4834,16 @@ public class SqlToRelConverter {
             if ((inputs != null) && !isParent) {
                 final LookupContext rels = new LookupContext(this, inputs, 
systemFieldList.size());
                 final RexNode node = lookup(resolve.path.steps().get(0).i, 
rels);
-                if (node == null) {
-                    return null;
-                } else {
-                    final Map<String, Integer> fieldOffsets = new HashMap<>();
-                    for (RelDataTypeField f : 
resolve.rowType().getFieldList()) {
-                        if (!fieldOffsets.containsKey(f.getName())) {
-                            fieldOffsets.put(f.getName(), f.getIndex());
-                        }
-                    }
-                    final Map<String, Integer> map = 
ImmutableMap.copyOf(fieldOffsets);
-                    return Pair.of(node, map);
-                }
+                assert node != null;
+                return Pair.of(
+                        node,
+                        (e, fieldName) -> {
+                            final RelDataTypeField field =
+                                    requireNonNull(
+                                            rowType.getField(fieldName, true, 
false),
+                                            () -> "field " + fieldName);
+                            return rexBuilder.makeFieldAccess(e, 
field.getIndex());
+                        });
             } else {
                 // We're referencing a relational expression which has not been
                 // converted yet. This occurs when from items are correlated,
@@ -4833,7 +4872,15 @@ public class SqlToRelConverter {
                         offset += c.getRowType().getFieldCount();
                     }
                     final RexNode c = 
rexBuilder.makeCorrel(builder.uniquify().build(), correlId);
-                    return Pair.of(c, fields.build());
+                    final ImmutableMap<String, Integer> fieldMap = 
fields.build();
+                    return Pair.of(
+                            c,
+                            (e, fieldName) -> {
+                                final int j =
+                                        requireNonNull(
+                                                fieldMap.get(fieldName), 
"field " + fieldName);
+                                return rexBuilder.makeFieldAccess(e, j);
+                            });
                 }
             }
         }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/tools/RelBuilder.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/tools/RelBuilder.java
index 98c8d7ce3ca..00e5075e3c5 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/tools/RelBuilder.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/tools/RelBuilder.java
@@ -154,7 +154,7 @@ import static org.apache.calcite.util.Static.RESOURCE;
  * <p>FLINK modifications are at lines
  *
  * <ol>
- *   <li>Should be removed after fix of FLINK-29804: Lines 2927 ~ 2930
+ *   <li>Should be removed after fix of FLINK-29804: Lines 2928 ~ 2931
  * </ol>
  */
 @Value.Enclosing
@@ -2879,7 +2879,8 @@ public class RelBuilder {
                 tableSpool(Spool.Type.LAZY, Spool.Type.LAZY, 
finder.relOptTable).build();
         RelNode seed = tableSpool(Spool.Type.LAZY, Spool.Type.LAZY, 
finder.relOptTable).build();
         RelNode repeatUnion =
-                struct.repeatUnionFactory.createRepeatUnion(seed, iterative, 
all, iterationLimit);
+                struct.repeatUnionFactory.createRepeatUnion(
+                        seed, iterative, all, iterationLimit, 
finder.relOptTable);
         return push(repeatUnion);
     }
 
@@ -2962,7 +2963,7 @@ public class RelBuilder {
             final ImmutableBitSet requiredColumns = 
RelOptUtil.correlationColumns(id, right.rel);
             join =
                     struct.correlateFactory.createCorrelate(
-                            left.rel, right.rel, id, requiredColumns, 
joinType);
+                            left.rel, right.rel, ImmutableList.of(), id, 
requiredColumns, joinType);
         } else {
             RelNode join0 =
                     struct.joinFactory.createJoin(
@@ -3020,6 +3021,7 @@ public class RelBuilder {
                 struct.correlateFactory.createCorrelate(
                         left.rel,
                         right.rel,
+                        ImmutableList.of(),
                         correlationId,
                         ImmutableBitSet.of(requiredOrdinals),
                         joinType);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
index cbb7df05680..c5111219283 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
@@ -45,6 +45,7 @@ import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUnknownAs;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperator;
@@ -90,6 +91,7 @@ import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSe
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_INTERNAL_NAME;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_KIND;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_NAME;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_NULL_AS;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_OPERANDS;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_RANGES;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SARG;
@@ -217,9 +219,18 @@ final class RexNodeJsonDeserializer extends 
StdDeserializer<RexNode> {
                 builder.add(range);
             }
         }
-        final boolean containsNull = 
sargNode.required(FIELD_NAME_CONTAINS_NULL).booleanValue();
-        return rexBuilder.makeSearchArgumentLiteral(
-                Sarg.of(containsNull, builder.build()), relDataType);
+        // TODO: Since 1.18.0 nothing is serialized to 
FIELD_NAME_CONTAINS_NULL.
+        // This if condition (should be removed in future) is required for 
backward compatibility
+        // with Flink 1.17.x where FIELD_NAME_CONTAINS_NULL is still used.
+        final JsonNode containsNull = sargNode.get(FIELD_NAME_CONTAINS_NULL);
+        if (containsNull != null) {
+            return rexBuilder.makeSearchArgumentLiteral(
+                    Sarg.of(containsNull.booleanValue(), builder.build()), 
relDataType);
+        }
+        final RexUnknownAs nullAs =
+                serializableToCalcite(
+                        RexUnknownAs.class, 
sargNode.required(FIELD_NAME_NULL_AS).asText());
+        return rexBuilder.makeSearchArgumentLiteral(Sarg.of(nullAs, 
builder.build()), relDataType);
     }
 
     private static @Nullable Object deserializeLiteralValue(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
index 5e788723cd3..f0b6bf80fae 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
@@ -93,7 +93,9 @@ final class RexNodeJsonSerializer extends 
StdSerializer<RexNode> {
     static final String FIELD_NAME_BOUND_LOWER = "lower";
     static final String FIELD_NAME_BOUND_UPPER = "upper";
     static final String FIELD_NAME_BOUND_TYPE = "boundType";
+
     static final String FIELD_NAME_CONTAINS_NULL = "containsNull";
+    static final String FIELD_NAME_NULL_AS = "nullAs";
     // Symbol fields
     static final String FIELD_NAME_SYMBOL = "symbol";
 
@@ -312,7 +314,8 @@ final class RexNodeJsonSerializer extends 
StdSerializer<RexNode> {
             gen.writeEndObject();
         }
         gen.writeEndArray();
-        gen.writeBooleanField(FIELD_NAME_CONTAINS_NULL, value.containsNull);
+        final SerializableSymbol symbol = calciteToSerializable(value.nullAs);
+        gen.writeStringField(FIELD_NAME_NULL_AS, symbol.getValue());
         gen.writeEndObject();
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/SymbolUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/SymbolUtil.java
index 8724ea45cfa..1d9ab77b635 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/SymbolUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/SymbolUtil.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.utils.DateTimeUtils;
 import com.google.common.collect.BoundType;
 import org.apache.calcite.avatica.util.TimeUnit;
 import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.rex.RexUnknownAs;
 import org.apache.calcite.sql.SqlJsonConstructorNullClause;
 import org.apache.calcite.sql.SqlJsonEmptyOrError;
 import org.apache.calcite.sql.SqlJsonExistsErrorBehavior;
@@ -473,6 +474,11 @@ public final class SymbolUtil {
         // BOUND
         addSymbolMapping(null, null, BoundType.OPEN, "BOUND", "OPEN");
         addSymbolMapping(null, null, BoundType.CLOSED, "BOUND", "CLOSED");
+
+        // UNKNOWN_AS
+        addSymbolMapping(null, null, RexUnknownAs.TRUE, "UNKNOWN_AS", "TRUE");
+        addSymbolMapping(null, null, RexUnknownAs.FALSE, "UNKNOWN_AS", 
"FALSE");
+        addSymbolMapping(null, null, RexUnknownAs.UNKNOWN, "UNKNOWN_AS", 
"UNKNOWN");
     }
 
     /**
diff --git a/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE 
b/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE
index eddbcab7247..bdeec088414 100644
--- a/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE
+++ b/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE
@@ -9,8 +9,8 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - com.google.guava:guava:29.0-jre
 - com.google.guava:failureaccess:1.0.1
 - com.esri.geometry:esri-geometry-api:2.2.0
-- org.apache.calcite:calcite-core:1.29.0
-- org.apache.calcite:calcite-linq4j:1.29.0
+- org.apache.calcite:calcite-core:1.30.0
+- org.apache.calcite:calcite-linq4j:1.30.0
 - org.apache.calcite.avatica:avatica-core:1.20.0
 - commons-codec:commons-codec:1.15
 - commons-io:commons-io:2.11.0
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala
index a7e7ff3ea2a..652a177bb98 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala
@@ -174,6 +174,7 @@ object FlinkLogicalRelFactories {
     def createCorrelate(
         left: RelNode,
         right: RelNode,
+        hints: util.List[RelHint],
         correlationId: CorrelationId,
         requiredColumns: ImmutableBitSet,
         joinType: JoinRelType): RelNode = {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
index e7cfb468a60..82fd2a9a682 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
@@ -422,7 +422,11 @@ class FlinkTypeFactory(
         new GenericRelDataType(generic.genericType, isNullable, typeSystem)
 
       case it: TimeIndicatorRelDataType =>
-        new TimeIndicatorRelDataType(it.typeSystem, it.originalType, 
isNullable, it.isEventTime)
+        new TimeIndicatorRelDataType(
+          it.typeSystemField,
+          it.originalType,
+          isNullable,
+          it.isEventTime)
 
       // for nested rows we keep the nullability property,
       // top-level rows fall back to Calcite's default handling
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index 47dd5734402..1ba6643dc29 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -23,6 +23,7 @@ import org.apache.flink.sql.parser.dml.RichSqlInsert
 import org.apache.flink.sql.parser.dql.SqlRichExplain
 import org.apache.flink.table.api.ValidationException
 import 
org.apache.flink.table.planner.calcite.PreValidateReWriter.{appendPartitionAndNullsProjects,
 notSupported}
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
 import org.apache.flink.table.planner.plan.schema.{CatalogSourceTable, 
FlinkPreparingTableBase, LegacyCatalogSourceTable}
 import org.apache.flink.util.Preconditions.checkArgument
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TimeIndicatorRelDataType.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TimeIndicatorRelDataType.scala
index 3789f24ab9b..a4076db6adf 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TimeIndicatorRelDataType.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TimeIndicatorRelDataType.scala
@@ -28,11 +28,11 @@ import java.lang
  * basic SQL type.
  */
 class TimeIndicatorRelDataType(
-    val typeSystem: RelDataTypeSystem,
+    val typeSystemField: RelDataTypeSystem,
     val originalType: BasicSqlType,
     val nullable: Boolean,
     val isEventTime: Boolean)
-  extends BasicSqlType(typeSystem, originalType.getSqlTypeName, 
originalType.getPrecision) {
+  extends BasicSqlType(typeSystemField, originalType.getSqlTypeName, 
originalType.getPrecision) {
 
   this.isNullable = nullable
   computeDigest()
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcJsonPlanTest.java
index 23652b1fcad..b763d92cdec 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcJsonPlanTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcJsonPlanTest.java
@@ -114,4 +114,18 @@ public class CalcJsonPlanTest extends TableTestBase {
                         + "from MyTable where "
                         + "(udf1(a) > 0 or (a * b) < 100) and b > 10");
     }
+
+    @Test
+    public void testSarg() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + "  a bigint\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'sink-insert-only' = 'false',\n"
+                        + "  'table-sink-class' = 'DEFAULT')";
+        tEnv.executeSql(sinkTableDdl);
+        String sql = "insert into MySink SELECT a from MyTable where a = 1 or 
a = 2 or a is null";
+        util.verifyJsonPlan(sql);
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SargJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SargJsonPlanITCase.java
new file mode 100644
index 00000000000..1320fa24920
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SargJsonPlanITCase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.jsonplan;
+
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.utils.JsonPlanTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+/** Test for Sarg JsonPlan ser/de. */
+public class SargJsonPlanITCase extends JsonPlanTestBase {
+    @Test
+    public void testSarg() throws ExecutionException, InterruptedException {
+        List<Row> data =
+                Arrays.asList(Row.of(1), Row.of(2), Row.of((Integer) null), 
Row.of(4), Row.of(5));
+        createTestValuesSourceTable("MyTable", data, "a int");
+        createTestNonInsertOnlyValuesSinkTable("`result`", "a int");
+        String sql =
+                "insert into `result` SELECT a\n"
+                        + "FROM MyTable WHERE a = 1 OR a = 2 OR a IS NULL";
+        compileSqlAndExecutePlan(sql).await();
+        List<String> expected = Arrays.asList("+I[1]", "+I[2]", "+I[null]");
+        assertResult(expected, TestValuesTableFactory.getResults("result"));
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcJsonPlanTest_jsonplan/testSarg.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcJsonPlanTest_jsonplan/testSarg.out
new file mode 100644
index 00000000000..f39a0d6b95d
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcJsonPlanTest_jsonplan/testSarg.out
@@ -0,0 +1,157 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "d",
+              "dataType" : "TIMESTAMP(3)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "bounded" : "false"
+          }
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ ]
+      }, {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ] ],
+        "producedType" : "ROW<`a` BIGINT> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` BIGINT> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` BIGINT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, filter=[], project=[a], metadata=[]]], fields=[a])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    } ],
+    "condition" : {
+      "kind" : "CALL",
+      "syntax" : "INTERNAL",
+      "internalName" : "$SEARCH$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "BIGINT"
+      }, {
+        "kind" : "LITERAL",
+        "sarg" : {
+          "ranges" : [ {
+            "lower" : {
+              "value" : 1,
+              "boundType" : "CLOSED"
+            },
+            "upper" : {
+              "value" : 1,
+              "boundType" : "CLOSED"
+            }
+          }, {
+            "lower" : {
+              "value" : 2,
+              "boundType" : "CLOSED"
+            },
+            "upper" : {
+              "value" : 2,
+              "boundType" : "CLOSED"
+            }
+          } ],
+          "nullAs" : "TRUE"
+        },
+        "type" : "BIGINT NOT NULL"
+      } ],
+      "type" : "BOOLEAN NOT NULL"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT>",
+    "description" : "Calc(select=[a], where=[SEARCH(a, Sarg[1, 2; NULL AS 
TRUE])])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "sink-insert-only" : "false",
+            "table-sink-class" : "DEFAULT",
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE 
b/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE
index 06e50d09bb2..c3ed918655d 100644
--- a/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE
+++ b/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE
@@ -6,6 +6,6 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software 
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
-- com.jayway.jsonpath:json-path:2.6.0
+- com.jayway.jsonpath:json-path:2.7.0
 - org.codehaus.janino:janino:3.1.9
 - org.codehaus.janino:commons-compiler:3.1.9
diff --git a/flink-table/pom.xml b/flink-table/pom.xml
index ab5e343fdfb..0efbed1fad7 100644
--- a/flink-table/pom.xml
+++ b/flink-table/pom.xml
@@ -77,12 +77,12 @@ under the License.
        </dependencyManagement>
 
        <properties>
-               <calcite.version>1.29.0</calcite.version>
-               <!-- Calcite 1.29.0 depends on 3.1.6,
+               <calcite.version>1.30.0</calcite.version>
+               <!-- Calcite 1.30.0 depends on 3.1.6,
                at the same time minimum 3.1.x Janino version passing Flink 
tests is 3.1.9,
                more details are in FLINK-27995 -->
                <janino.version>3.1.9</janino.version>
-               <jsonpath.version>2.6.0</jsonpath.version>
+               <jsonpath.version>2.7.0</jsonpath.version>
                <guava.version>29.0-jre</guava.version>
        </properties>
 </project>

Reply via email to