This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 51e5ce6f4b9 Revert "[FLINK-33064][table-planner] Improve the error 
message when the lookup source is used as the scan source (#23377)" (#23463)
51e5ce6f4b9 is described below

commit 51e5ce6f4b9c889e7e253b3b8eba3ca8ee6af4e8
Author: yunhong <[email protected]>
AuthorDate: Tue Sep 26 17:09:55 2023 +0800

    Revert "[FLINK-33064][table-planner] Improve the error message when the 
lookup source is used as the scan source (#23377)" (#23463)
    
    This reverts commit be509e6d67471d886e58d3ddea6ddd3627a191a8.
    
    Co-authored-by: zhengyunhong.zyh <[email protected]>
---
 .../TestDynamicTableSourceOnlyFactory.java         |  41 +----
 .../apache/calcite/sql2rel/SqlToRelConverter.java  | 172 ++++-----------------
 .../planner/plan/utils/TemporalTableJoinUtil.java  |  14 --
 .../rules/common/CommonTemporalTableJoinRule.scala |  17 +-
 .../table/planner/catalog/CatalogTableITCase.scala |  52 +++----
 .../planner/plan/stream/sql/TableScanTest.scala    |   8 +-
 6 files changed, 68 insertions(+), 236 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableSourceOnlyFactory.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableSourceOnlyFactory.java
index 6d3af4650d3..7aa78b6492e 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableSourceOnlyFactory.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableSourceOnlyFactory.java
@@ -19,10 +19,7 @@
 package org.apache.flink.table.factories;
 
 import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.ScanTableSource;
 
 import java.util.Collections;
 import java.util.Set;
@@ -35,14 +32,9 @@ public final class TestDynamicTableSourceOnlyFactory 
implements DynamicTableSour
 
     public static final String IDENTIFIER = "source-only";
 
-    private static final ConfigOption<Boolean> BOUNDED =
-            ConfigOptions.key("bounded").booleanType().defaultValue(false);
-
     @Override
     public DynamicTableSource createDynamicTableSource(Context context) {
-        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
-        boolean isBounded = helper.getOptions().get(BOUNDED);
-        return new MockedScanTableSource(isBounded);
+        return null;
     }
 
     @Override
@@ -57,35 +49,6 @@ public final class TestDynamicTableSourceOnlyFactory 
implements DynamicTableSour
 
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
-        return Collections.singleton(BOUNDED);
-    }
-
-    /** A mocked {@link ScanTableSource} for validation test. */
-    private static class MockedScanTableSource implements ScanTableSource {
-        private final boolean isBounded;
-
-        private MockedScanTableSource(boolean isBounded) {
-            this.isBounded = isBounded;
-        }
-
-        @Override
-        public DynamicTableSource copy() {
-            return null;
-        }
-
-        @Override
-        public String asSummaryString() {
-            return null;
-        }
-
-        @Override
-        public ChangelogMode getChangelogMode() {
-            return ChangelogMode.insertOnly();
-        }
-
-        @Override
-        public ScanRuntimeProvider getScanRuntimeProvider(ScanContext 
runtimeProviderContext) {
-            return () -> isBounded;
-        }
+        return Collections.emptySet();
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index 67663ce0984..71cafa91308 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -17,16 +17,11 @@
 package org.apache.calcite.sql2rel;
 
 import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.data.TimestampData;
 import 
org.apache.flink.table.planner.alias.ClearJoinHintWithInvalidPropagationShuttle;
 import org.apache.flink.table.planner.calcite.TimestampSchemaVersion;
 import org.apache.flink.table.planner.hint.FlinkHints;
 import org.apache.flink.table.planner.plan.FlinkCalciteCatalogSnapshotReader;
-import org.apache.flink.table.planner.plan.schema.TableSourceTable;
-import org.apache.flink.table.planner.plan.utils.TemporalTableJoinUtil;
 import org.apache.flink.table.planner.utils.ShortcutUtils;
 
 import com.google.common.base.Preconditions;
@@ -69,7 +64,6 @@ import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.RelFactories;
 import org.apache.calcite.rel.core.Sample;
 import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.hint.HintStrategyTable;
 import org.apache.calcite.rel.hint.Hintable;
 import org.apache.calcite.rel.hint.RelHint;
@@ -241,17 +235,12 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * <p>FLINK modifications are at lines
  *
  * <ol>
- *   <li>Added in FLINK-29081, FLINK-28682: Lines 667 ~ 677
- *   <li>Added in FLINK-28682: Lines 2300 ~ 2317
- *   <li>Added in FLINK-28682: Lines 2354 ~ 2382
- *   <li>Added in FLINK-32474: Lines 2932 ~ 2944
- *   <li>Added in FLINK-32474: Lines 3057 ~ 3093
- *   <li>Added in FLINK-20873: Lines 5594 ~ 5603
- *   <li>Added in FLINK-33064: Lines 2431 ~ 2475
- *   <li>Added in FLINK-33064: Lines 2481 ~ 2570
- *   <li>Added in FLINK-33064: Lines 2970 ~ 2978
- *   <li>Added in FLINK-33064: Lines 3050 ~ 3052
- *   <li>Added in FLINK-33064: Lines 4102 ~ 4107
+ *   <li>Added in FLINK-29081, FLINK-28682: Lines 644 ~ 654
+ *   <li>Added in FLINK-28682: Lines 2277 ~ 2294
+ *   <li>Added in FLINK-28682: Lines 2331 ~ 2359
+ *   <li>Added in FLINK-20873: Lines 5484 ~ 5493
+ *   <li>Added in FLINK-32474: Lines 2841 ~ 2853
+ *   <li>Added in FLINK-32474: Lines 2953 ~ 2987
  * </ol>
  */
 @SuppressWarnings("UnstableApiUsage")
@@ -2428,17 +2417,25 @@ public class SqlToRelConverter {
                     throw new AssertionError("unknown TABLESAMPLE type: " + 
sampleSpec);
                 }
                 return;
-                // ----- FLINK MODIFICATION BEGIN -----
+
             case TABLE_REF:
-                convertTableRef(bb, from, false);
+                call = (SqlCall) from;
+                convertIdentifier(bb, call.operand(0), null, call.operand(1), 
null);
                 return;
 
             case IDENTIFIER:
-                convertIdentifier(bb, (SqlIdentifier) from, null, null, null, 
false);
+                convertIdentifier(bb, (SqlIdentifier) from, null, null, null);
                 return;
 
             case EXTEND:
-                convertExtend(bb, from, false);
+                call = (SqlCall) from;
+                final SqlNode operand0 = call.getOperandList().get(0);
+                final SqlIdentifier id =
+                        operand0.getKind() == SqlKind.TABLE_REF
+                                ? ((SqlCall) operand0).operand(0)
+                                : (SqlIdentifier) operand0;
+                SqlNodeList extendedColumns = (SqlNodeList) 
call.getOperandList().get(1);
+                convertIdentifier(bb, id, extendedColumns, null, null);
                 return;
 
             case SNAPSHOT:
@@ -2469,106 +2466,19 @@ public class SqlToRelConverter {
                 return;
 
             case COLLECTION_TABLE:
-                convertCollectionTable(bb, from, false);
+                call = (SqlCall) from;
+
+                // Dig out real call; TABLE() wrapper is just syntactic.
+                assert call.getOperandList().size() == 1;
+                final SqlCall call2 = call.operand(0);
+                convertCollectionTable(bb, call2);
                 return;
 
-                // ----- FLINK MODIFICATION END -----
             default:
                 throw new AssertionError("not a join operator " + from);
         }
     }
 
-    // ----- FLINK MODIFICATION BEGIN -----
-    private void convertTableRef(Blackboard bb, SqlNode from, boolean 
isTemporalJoinRightSide) {
-        SqlCall call = (SqlCall) from;
-        convertIdentifier(
-                bb, call.operand(0), null, call.operand(1), null, 
isTemporalJoinRightSide);
-    }
-
-    private void convertExtend(Blackboard bb, SqlNode from, boolean 
isTemporalJoinRightSide) {
-        SqlCall call = (SqlCall) from;
-        final SqlNode operand0 = call.getOperandList().get(0);
-        final SqlIdentifier id =
-                operand0.getKind() == SqlKind.TABLE_REF
-                        ? ((SqlCall) operand0).operand(0)
-                        : (SqlIdentifier) operand0;
-        SqlNodeList extendedColumns = (SqlNodeList) 
call.getOperandList().get(1);
-        convertIdentifier(bb, id, extendedColumns, null, null, 
isTemporalJoinRightSide);
-    }
-
-    /**
-     * Converts a FROM clause into a relational expression for the right side 
in temporal join. The
-     * right side in temporary join is a special type of source which can 
support {@link
-     * org.apache.flink.table.connector.source.LookupTableSource}. So we need 
to distinguish it from
-     * the regular scan table source during convert.
-     *
-     * @param bb Scope within which to resolve identifiers
-     * @param from FROM clause of a query. Examples include:
-     * @param fieldNames Field aliases, usually come from AS clause, or null
-     */
-    private void convertTemporalJoinRightSide(
-            Blackboard bb, @Nullable SqlNode from, @Nullable List<String> 
fieldNames) {
-        if (from == null) {
-            bb.setRoot(LogicalValues.createOneRow(cluster), false);
-            return;
-        }
-
-        switch (from.getKind()) {
-            case TABLE_REF:
-                convertTableRef(bb, from, true);
-                return;
-            case IDENTIFIER:
-                convertIdentifier(bb, (SqlIdentifier) from, null, null, null, 
true);
-                return;
-            case EXTEND:
-                convertExtend(bb, from, true);
-                return;
-            case COLLECTION_TABLE:
-                convertCollectionTable(bb, from, true);
-                return;
-            default:
-                convertFrom(bb, from, fieldNames);
-        }
-    }
-
-    /**
-     * Validate the input {@link RelNode} to judge if it is a legal source. 
For example, for a table
-     * source that only implements the {@link
-     * org.apache.flink.table.connector.source.LookupTableSource}, and doesn't 
implement the {@link
-     * ScanTableSource}, it can only be used as a right table ref in temporal 
join or lookup join
-     * and cannot be used as a scan table.
-     */
-    private void validateScan(RelNode relNode, boolean 
isTemporalJoinRightSide) {
-        relNode.accept(
-                new RelShuttleImpl() {
-                    @Override
-                    public RelNode visit(TableScan scan) {
-                        final RelOptTable table = scan.getTable();
-                        if (table instanceof TableSourceTable) {
-                            final TableSourceTable sourceTable =
-                                    
scan.getTable().unwrap(TableSourceTable.class);
-                            assert sourceTable != null;
-                            final DynamicTableSource dynamicTableSource = 
sourceTable.tableSource();
-                            if (!isTemporalJoinRightSide
-                                    && !(dynamicTableSource instanceof 
ScanTableSource)) {
-                                throw new ValidationException(
-                                        String.format(
-                                                "The specified table source %s 
doesn't extend %s and can not be used "
-                                                        + "as the scan 
source.\n"
-                                                        + "Hint: You can read 
the data from the source as a dim table "
-                                                        + "with the look up 
join syntax. Otherwise, please refer to "
-                                                        + "the document and 
change the type of the connector to a "
-                                                        + "source table that 
supports direct reads.",
-                                                
sourceTable.contextResolvedTable().getIdentifier(),
-                                                
ScanTableSource.class.getSimpleName()));
-                            }
-                        }
-                        return scan;
-                    }
-                });
-    }
-    // ----- FLINK MODIFICATION END -----
-
     private void convertUnnest(Blackboard bb, SqlCall call, @Nullable 
List<String> fieldNames) {
         final List<SqlNode> nodes = call.getOperandList();
         final SqlUnnestOperator operator = (SqlUnnestOperator) 
call.getOperator();
@@ -2920,8 +2830,7 @@ public class SqlToRelConverter {
             SqlIdentifier id,
             @Nullable SqlNodeList extendedColumns,
             @Nullable SqlNodeList tableHints,
-            @Nullable SchemaVersion schemaVersion,
-            boolean isTemporalJoinRightSide) {
+            @Nullable SchemaVersion schemaVersion) {
         final SqlValidatorNamespace fromNamespace = getNamespace(id).resolve();
         if (fromNamespace.getNode() != null) {
             convertFrom(bb, fromNamespace.getNode());
@@ -2955,7 +2864,7 @@ public class SqlToRelConverter {
                 hintStrategies.apply(
                         SqlUtil.getRelHint(hintStrategies, tableHints),
                         LogicalTableScan.create(cluster, table, 
ImmutableList.of()));
-        final RelNode tableRel = toRel(table, hints, isTemporalJoinRightSide);
+        final RelNode tableRel = toRel(table, hints);
         bb.setRoot(tableRel, true);
 
         if (RelOptUtil.isPureOrder(castNonNull(bb.root)) && 
removeSortInSubQuery(bb.top)) {
@@ -2967,15 +2876,7 @@ public class SqlToRelConverter {
         }
     }
 
-    // ----- FLINK MODIFICATION BEGIN -----
-    protected void convertCollectionTable(
-            Blackboard bb, SqlNode from, boolean isTemporalJoinRightSide) {
-        SqlCall sqlCall = (SqlCall) from;
-
-        // Dig out real call; TABLE() wrapper is just syntactic.
-        assert sqlCall.getOperandList().size() == 1;
-        final SqlCall call = sqlCall.operand(0);
-        // ----- FLINK MODIFICATION END -----
+    protected void convertCollectionTable(Blackboard bb, SqlCall call) {
         final SqlOperator operator = call.getOperator();
         if (operator == SqlStdOperatorTable.TABLESAMPLE) {
             final String sampleName = 
SqlLiteral.unchain(call.operand(0)).getValueAs(String.class);
@@ -3010,7 +2911,7 @@ public class SqlToRelConverter {
             RelOptTable relOptTable =
                     RelOptTableImpl.create(
                             null, rowType, udf.getNameAsId().names, table, 
expressionFunction);
-            RelNode converted = toRel(relOptTable, ImmutableList.of(), 
isTemporalJoinRightSide);
+            RelNode converted = toRel(relOptTable, ImmutableList.of());
             bb.setRoot(converted, true);
             return;
         }
@@ -3047,22 +2948,15 @@ public class SqlToRelConverter {
         final SqlSnapshot snapshot = (SqlSnapshot) call;
         final RexNode period = bb.convertExpression(snapshot.getPeriod());
 
-        // ----- FLINK MODIFICATION BEGIN -----
-        boolean isTemporalJoin = 
TemporalTableJoinUtil.isTemporalJoinSupportPeriod(period);
-        // ----- FLINK MODIFICATION END -----
-
         // convert inner query, could be a table name or a derived table
         SqlNode expr = snapshot.getTableRef();
-
         // ----- FLINK MODIFICATION BEGIN -----
         SqlNode tableRef = snapshot.getTableRef();
         // since we have reduced the period of SqlSnapshot in the validate 
phase, we only need to
         // check whether the period is a RexLiteral.
         // in most cases, tableRef is a SqlBasicCall and the first operand is 
a SqlIdentifier.
         // when using SQL Hints, tableRef will be a SqlTableRef.
-        if (isTemporalJoin) {
-            convertTemporalJoinRightSide(bb, expr, Collections.emptyList());
-        } else if (((tableRef instanceof SqlBasicCall
+        if (((tableRef instanceof SqlBasicCall
                                 && ((SqlBasicCall) tableRef).operand(0) 
instanceof SqlIdentifier)
                         || (tableRef instanceof SqlTableRef))
                 && period instanceof RexLiteral) {
@@ -3086,7 +2980,7 @@ public class SqlToRelConverter {
                             ? ((SqlBasicCall) tableRef).operand(0)
                             : ((SqlTableRef) tableRef).operand(0);
             SchemaVersion schemaVersion = 
TimestampSchemaVersion.of(timeTravelTimestamp);
-            convertIdentifier(bb, sqlIdentifier, null, null, schemaVersion, 
false);
+            convertIdentifier(bb, sqlIdentifier, null, null, schemaVersion);
         } else {
             convertFrom(bb, expr);
         }
@@ -4099,12 +3993,8 @@ public class SqlToRelConverter {
         return ViewExpanders.toRelContext(viewExpander, cluster, hints);
     }
 
-    // ----- FLINK MODIFICATION BEGIN -----
-    public RelNode toRel(
-            final RelOptTable table, final List<RelHint> hints, boolean 
isTemporalJoinRightSide) {
+    public RelNode toRel(final RelOptTable table, final List<RelHint> hints) {
         final RelNode scan = table.toRel(createToRelContext(hints));
-        validateScan(scan, isTemporalJoinRightSide);
-        // ----- FLINK MODIFICATION END -----
 
         final InitializerExpressionFactory ief =
                 table.maybeUnwrap(InitializerExpressionFactory.class)
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java
index 1c0585bef6e..38b5c705b0d 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java
@@ -18,11 +18,7 @@
 
 package org.apache.flink.table.planner.plan.utils;
 
-import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType;
-
 import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexCorrelVariable;
-import org.apache.calcite.rex.RexFieldAccess;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexVisitor;
 import org.apache.calcite.rex.RexVisitorImpl;
@@ -70,14 +66,4 @@ public class TemporalTableJoinUtil {
         return call.getOperator() == TemporalJoinUtil.TEMPORAL_JOIN_CONDITION()
                 && call.operands.size() == 5;
     }
-
-    public static boolean isTemporalJoinSupportPeriod(RexNode period) {
-        // it should be left table's field and is a time attribute
-        if (period instanceof RexFieldAccess) {
-            RexFieldAccess rexFieldAccess = (RexFieldAccess) period;
-            return rexFieldAccess.getType() instanceof TimeIndicatorRelDataType
-                    && rexFieldAccess.getReferenceExpr() instanceof 
RexCorrelVariable;
-        }
-        return false;
-    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/common/CommonTemporalTableJoinRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/common/CommonTemporalTableJoinRule.scala
index b876b4d4b9b..a755dfeeb2e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/common/CommonTemporalTableJoinRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/common/CommonTemporalTableJoinRule.scala
@@ -22,13 +22,13 @@ import 
org.apache.flink.table.connector.source.LookupTableSource
 import 
org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalLegacyTableSourceScan,
 FlinkLogicalRel, FlinkLogicalSnapshot, FlinkLogicalTableSourceScan}
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalLookupJoin,
 StreamPhysicalTemporalJoin}
 import org.apache.flink.table.planner.plan.schema.{LegacyTableSourceTable, 
TableSourceTable, TimeIndicatorRelDataType}
-import org.apache.flink.table.planner.plan.utils.TemporalTableJoinUtil
 import org.apache.flink.table.sources.LookupableTableSource
 
 import org.apache.calcite.plan.hep.HepRelVertex
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.core.TableScan
 import org.apache.calcite.rel.logical.{LogicalProject, LogicalTableScan}
+import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess}
 
 /**
  * Base implementation that matches temporal join node.
@@ -42,12 +42,15 @@ trait CommonTemporalTableJoinRule {
   protected def matches(snapshot: FlinkLogicalSnapshot): Boolean = {
 
     // period specification check
-    val isTemporalJoinSnapshot =
-      TemporalTableJoinUtil.isTemporalJoinSupportPeriod(snapshot.getPeriod)
-    if (!isTemporalJoinSnapshot) {
-      throw new TableException(
-        "Temporal table join currently only supports " +
-          "'FOR SYSTEM_TIME AS OF' left table's time attribute field.")
+    snapshot.getPeriod match {
+      // it should be left table's field and is a time attribute
+      case r: RexFieldAccess
+          if r.getType.isInstanceOf[TimeIndicatorRelDataType] &&
+            r.getReferenceExpr.isInstanceOf[RexCorrelVariable] => // pass
+      case _ =>
+        throw new TableException(
+          "Temporal table join currently only supports " +
+            "'FOR SYSTEM_TIME AS OF' left table's time attribute field.")
     }
 
     true
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index 14aafa094e5..88fd58c8fdc 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -1117,17 +1117,15 @@ class CatalogTableITCase(isStreamingMode: Boolean) 
extends AbstractTestBase {
 
   @Test
   def testCreateViewAndShowCreateTable(): Unit = {
-    val isBounded = !isStreamingMode
     val createTableDDL =
-      s""" |create table `source` (
-         |  `id` bigint not null,
-         | `group` string not null,
-         | `score` double
-         |) with (
-         |  'connector' = 'source-only',
-         |  'bounded' = '$isBounded'
-         |)
-         |""".stripMargin
+      """ |create table `source` (
+        |  `id` bigint not null,
+        | `group` string not null,
+        | `score` double
+        |) with (
+        |  'connector' = 'source-only'
+        |)
+        |""".stripMargin
     val createViewDDL =
       """ |create view `tmp` as
         |select `group`, avg(`score`) as avg_score
@@ -1146,15 +1144,13 @@ class CatalogTableITCase(isStreamingMode: Boolean) 
extends AbstractTestBase {
 
   @Test
   def testAlterViewRename(): Unit = {
-    val isBounded = !isStreamingMode
-    tableEnv.executeSql(s"""
-                           | CREATE TABLE T (
-                           |   id INT
-                           | ) WITH (
-                           |   'connector' = 'source-only',
-                           |   'bounded' = '$isBounded'
-                           | )
-                           |""".stripMargin)
+    tableEnv.executeSql("""
+                          | CREATE TABLE T (
+                          |   id INT
+                          | ) WITH (
+                          |   'connector' = 'source-only'
+                          | )
+                          |""".stripMargin)
     tableEnv.executeSql("CREATE VIEW V AS SELECT * FROM T")
 
     tableEnv.executeSql("ALTER VIEW V RENAME TO V2")
@@ -1163,16 +1159,14 @@ class CatalogTableITCase(isStreamingMode: Boolean) 
extends AbstractTestBase {
 
   @Test
   def testAlterViewAs(): Unit = {
-    val isBounded = !isStreamingMode
-    tableEnv.executeSql(s"""
-                           | CREATE TABLE T (
-                           |   a INT,
-                           |   b INT
-                           | ) WITH (
-                           |   'connector' = 'source-only',
-                           |   'bounded' = '$isBounded'
-                           | )
-                           |""".stripMargin)
+    tableEnv.executeSql("""
+                          | CREATE TABLE T (
+                          |   a INT,
+                          |   b INT
+                          | ) WITH (
+                          |   'connector' = 'source-only'
+                          | )
+                          |""".stripMargin)
     tableEnv.executeSql("CREATE VIEW V AS SELECT a FROM T")
 
     tableEnv.executeSql("ALTER VIEW V AS SELECT b FROM T")
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
index eebafbeecd3..d228c206aa5 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
@@ -731,12 +731,8 @@ class TableScanTest extends TableTestBase {
                      |  'table-source-class' = 
'${classOf[MockedLookupTableSource].getName}'
                      |)
       """.stripMargin)
-    thrown.expect(classOf[ValidationException])
-    thrown.expectMessage(
-      "The specified table source `default_catalog`.`default_database`.`src` " 
+
-        "doesn't extend ScanTableSource and can not be used as the scan 
source.\nHint: You can read the data " +
-        "from the source as a dim table with the look up join syntax. 
Otherwise, please refer to the document " +
-        "and change the type of the connector to a source table that supports 
direct reads.")
+    thrown.expect(classOf[TableException])
+    thrown.expectMessage("Cannot generate a valid execution plan for the given 
query")
     util.verifyRelPlan("SELECT * FROM src", ExplainDetail.CHANGELOG_MODE)
   }
 

Reply via email to