This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 51e5ce6f4b9 Revert "[FLINK-33064][table-planner] Improve the error
message when the lookup source is used as the scan source (#23377)" (#23463)
51e5ce6f4b9 is described below
commit 51e5ce6f4b9c889e7e253b3b8eba3ca8ee6af4e8
Author: yunhong <[email protected]>
AuthorDate: Tue Sep 26 17:09:55 2023 +0800
Revert "[FLINK-33064][table-planner] Improve the error message when the
lookup source is used as the scan source (#23377)" (#23463)
This reverts commit be509e6d67471d886e58d3ddea6ddd3627a191a8.
Co-authored-by: zhengyunhong.zyh <[email protected]>
---
.../TestDynamicTableSourceOnlyFactory.java | 41 +----
.../apache/calcite/sql2rel/SqlToRelConverter.java | 172 ++++-----------------
.../planner/plan/utils/TemporalTableJoinUtil.java | 14 --
.../rules/common/CommonTemporalTableJoinRule.scala | 17 +-
.../table/planner/catalog/CatalogTableITCase.scala | 52 +++----
.../planner/plan/stream/sql/TableScanTest.scala | 8 +-
6 files changed, 68 insertions(+), 236 deletions(-)
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableSourceOnlyFactory.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableSourceOnlyFactory.java
index 6d3af4650d3..7aa78b6492e 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableSourceOnlyFactory.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableSourceOnlyFactory.java
@@ -19,10 +19,7 @@
package org.apache.flink.table.factories;
import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.ScanTableSource;
import java.util.Collections;
import java.util.Set;
@@ -35,14 +32,9 @@ public final class TestDynamicTableSourceOnlyFactory
implements DynamicTableSour
public static final String IDENTIFIER = "source-only";
- private static final ConfigOption<Boolean> BOUNDED =
- ConfigOptions.key("bounded").booleanType().defaultValue(false);
-
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
- FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
- boolean isBounded = helper.getOptions().get(BOUNDED);
- return new MockedScanTableSource(isBounded);
+ return null;
}
@Override
@@ -57,35 +49,6 @@ public final class TestDynamicTableSourceOnlyFactory
implements DynamicTableSour
@Override
public Set<ConfigOption<?>> optionalOptions() {
- return Collections.singleton(BOUNDED);
- }
-
- /** A mocked {@link ScanTableSource} for validation test. */
- private static class MockedScanTableSource implements ScanTableSource {
- private final boolean isBounded;
-
- private MockedScanTableSource(boolean isBounded) {
- this.isBounded = isBounded;
- }
-
- @Override
- public DynamicTableSource copy() {
- return null;
- }
-
- @Override
- public String asSummaryString() {
- return null;
- }
-
- @Override
- public ChangelogMode getChangelogMode() {
- return ChangelogMode.insertOnly();
- }
-
- @Override
- public ScanRuntimeProvider getScanRuntimeProvider(ScanContext
runtimeProviderContext) {
- return () -> isBounded;
- }
+ return Collections.emptySet();
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index 67663ce0984..71cafa91308 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -17,16 +17,11 @@
package org.apache.calcite.sql2rel;
import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.TimestampData;
import
org.apache.flink.table.planner.alias.ClearJoinHintWithInvalidPropagationShuttle;
import org.apache.flink.table.planner.calcite.TimestampSchemaVersion;
import org.apache.flink.table.planner.hint.FlinkHints;
import org.apache.flink.table.planner.plan.FlinkCalciteCatalogSnapshotReader;
-import org.apache.flink.table.planner.plan.schema.TableSourceTable;
-import org.apache.flink.table.planner.plan.utils.TemporalTableJoinUtil;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import com.google.common.base.Preconditions;
@@ -69,7 +64,6 @@ import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.core.Sample;
import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.hint.HintStrategyTable;
import org.apache.calcite.rel.hint.Hintable;
import org.apache.calcite.rel.hint.RelHint;
@@ -241,17 +235,12 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
* <p>FLINK modifications are at lines
*
* <ol>
- * <li>Added in FLINK-29081, FLINK-28682: Lines 667 ~ 677
- * <li>Added in FLINK-28682: Lines 2300 ~ 2317
- * <li>Added in FLINK-28682: Lines 2354 ~ 2382
- * <li>Added in FLINK-32474: Lines 2932 ~ 2944
- * <li>Added in FLINK-32474: Lines 3057 ~ 3093
- * <li>Added in FLINK-20873: Lines 5594 ~ 5603
- * <li>Added in FLINK-33064: Lines 2431 ~ 2475
- * <li>Added in FLINK-33064: Lines 2481 ~ 2570
- * <li>Added in FLINK-33064: Lines 2970 ~ 2978
- * <li>Added in FLINK-33064: Lines 3050 ~ 3052
- * <li>Added in FLINK-33064: Lines 4102 ~ 4107
+ * <li>Added in FLINK-29081, FLINK-28682: Lines 644 ~ 654
+ * <li>Added in FLINK-28682: Lines 2277 ~ 2294
+ * <li>Added in FLINK-28682: Lines 2331 ~ 2359
+ * <li>Added in FLINK-20873: Lines 5484 ~ 5493
+ * <li>Added in FLINK-32474: Lines 2841 ~ 2853
+ * <li>Added in FLINK-32474: Lines 2953 ~ 2987
* </ol>
*/
@SuppressWarnings("UnstableApiUsage")
@@ -2428,17 +2417,25 @@ public class SqlToRelConverter {
throw new AssertionError("unknown TABLESAMPLE type: " +
sampleSpec);
}
return;
- // ----- FLINK MODIFICATION BEGIN -----
+
case TABLE_REF:
- convertTableRef(bb, from, false);
+ call = (SqlCall) from;
+ convertIdentifier(bb, call.operand(0), null, call.operand(1),
null);
return;
case IDENTIFIER:
- convertIdentifier(bb, (SqlIdentifier) from, null, null, null,
false);
+ convertIdentifier(bb, (SqlIdentifier) from, null, null, null);
return;
case EXTEND:
- convertExtend(bb, from, false);
+ call = (SqlCall) from;
+ final SqlNode operand0 = call.getOperandList().get(0);
+ final SqlIdentifier id =
+ operand0.getKind() == SqlKind.TABLE_REF
+ ? ((SqlCall) operand0).operand(0)
+ : (SqlIdentifier) operand0;
+ SqlNodeList extendedColumns = (SqlNodeList)
call.getOperandList().get(1);
+ convertIdentifier(bb, id, extendedColumns, null, null);
return;
case SNAPSHOT:
@@ -2469,106 +2466,19 @@ public class SqlToRelConverter {
return;
case COLLECTION_TABLE:
- convertCollectionTable(bb, from, false);
+ call = (SqlCall) from;
+
+ // Dig out real call; TABLE() wrapper is just syntactic.
+ assert call.getOperandList().size() == 1;
+ final SqlCall call2 = call.operand(0);
+ convertCollectionTable(bb, call2);
return;
- // ----- FLINK MODIFICATION END -----
default:
throw new AssertionError("not a join operator " + from);
}
}
- // ----- FLINK MODIFICATION BEGIN -----
- private void convertTableRef(Blackboard bb, SqlNode from, boolean
isTemporalJoinRightSide) {
- SqlCall call = (SqlCall) from;
- convertIdentifier(
- bb, call.operand(0), null, call.operand(1), null,
isTemporalJoinRightSide);
- }
-
- private void convertExtend(Blackboard bb, SqlNode from, boolean
isTemporalJoinRightSide) {
- SqlCall call = (SqlCall) from;
- final SqlNode operand0 = call.getOperandList().get(0);
- final SqlIdentifier id =
- operand0.getKind() == SqlKind.TABLE_REF
- ? ((SqlCall) operand0).operand(0)
- : (SqlIdentifier) operand0;
- SqlNodeList extendedColumns = (SqlNodeList)
call.getOperandList().get(1);
- convertIdentifier(bb, id, extendedColumns, null, null,
isTemporalJoinRightSide);
- }
-
- /**
- * Converts a FROM clause into a relational expression for the right side
in temporal join. The
- * right side in temporary join is a special type of source which can
support {@link
- * org.apache.flink.table.connector.source.LookupTableSource}. So we need
to distinguish it from
- * the regular scan table source during convert.
- *
- * @param bb Scope within which to resolve identifiers
- * @param from FROM clause of a query. Examples include:
- * @param fieldNames Field aliases, usually come from AS clause, or null
- */
- private void convertTemporalJoinRightSide(
- Blackboard bb, @Nullable SqlNode from, @Nullable List<String>
fieldNames) {
- if (from == null) {
- bb.setRoot(LogicalValues.createOneRow(cluster), false);
- return;
- }
-
- switch (from.getKind()) {
- case TABLE_REF:
- convertTableRef(bb, from, true);
- return;
- case IDENTIFIER:
- convertIdentifier(bb, (SqlIdentifier) from, null, null, null,
true);
- return;
- case EXTEND:
- convertExtend(bb, from, true);
- return;
- case COLLECTION_TABLE:
- convertCollectionTable(bb, from, true);
- return;
- default:
- convertFrom(bb, from, fieldNames);
- }
- }
-
- /**
- * Validate the input {@link RelNode} to judge if it is a legal source.
For example, for a table
- * source that only implements the {@link
- * org.apache.flink.table.connector.source.LookupTableSource}, and doesn't
implement the {@link
- * ScanTableSource}, it can only be used as a right table ref in temporal
join or lookup join
- * and cannot be used as a scan table.
- */
- private void validateScan(RelNode relNode, boolean
isTemporalJoinRightSide) {
- relNode.accept(
- new RelShuttleImpl() {
- @Override
- public RelNode visit(TableScan scan) {
- final RelOptTable table = scan.getTable();
- if (table instanceof TableSourceTable) {
- final TableSourceTable sourceTable =
-
scan.getTable().unwrap(TableSourceTable.class);
- assert sourceTable != null;
- final DynamicTableSource dynamicTableSource =
sourceTable.tableSource();
- if (!isTemporalJoinRightSide
- && !(dynamicTableSource instanceof
ScanTableSource)) {
- throw new ValidationException(
- String.format(
- "The specified table source %s
doesn't extend %s and can not be used "
- + "as the scan
source.\n"
- + "Hint: You can read
the data from the source as a dim table "
- + "with the look up
join syntax. Otherwise, please refer to "
- + "the document and
change the type of the connector to a "
- + "source table that
supports direct reads.",
-
sourceTable.contextResolvedTable().getIdentifier(),
-
ScanTableSource.class.getSimpleName()));
- }
- }
- return scan;
- }
- });
- }
- // ----- FLINK MODIFICATION END -----
-
private void convertUnnest(Blackboard bb, SqlCall call, @Nullable
List<String> fieldNames) {
final List<SqlNode> nodes = call.getOperandList();
final SqlUnnestOperator operator = (SqlUnnestOperator)
call.getOperator();
@@ -2920,8 +2830,7 @@ public class SqlToRelConverter {
SqlIdentifier id,
@Nullable SqlNodeList extendedColumns,
@Nullable SqlNodeList tableHints,
- @Nullable SchemaVersion schemaVersion,
- boolean isTemporalJoinRightSide) {
+ @Nullable SchemaVersion schemaVersion) {
final SqlValidatorNamespace fromNamespace = getNamespace(id).resolve();
if (fromNamespace.getNode() != null) {
convertFrom(bb, fromNamespace.getNode());
@@ -2955,7 +2864,7 @@ public class SqlToRelConverter {
hintStrategies.apply(
SqlUtil.getRelHint(hintStrategies, tableHints),
LogicalTableScan.create(cluster, table,
ImmutableList.of()));
- final RelNode tableRel = toRel(table, hints, isTemporalJoinRightSide);
+ final RelNode tableRel = toRel(table, hints);
bb.setRoot(tableRel, true);
if (RelOptUtil.isPureOrder(castNonNull(bb.root)) &&
removeSortInSubQuery(bb.top)) {
@@ -2967,15 +2876,7 @@ public class SqlToRelConverter {
}
}
- // ----- FLINK MODIFICATION BEGIN -----
- protected void convertCollectionTable(
- Blackboard bb, SqlNode from, boolean isTemporalJoinRightSide) {
- SqlCall sqlCall = (SqlCall) from;
-
- // Dig out real call; TABLE() wrapper is just syntactic.
- assert sqlCall.getOperandList().size() == 1;
- final SqlCall call = sqlCall.operand(0);
- // ----- FLINK MODIFICATION END -----
+ protected void convertCollectionTable(Blackboard bb, SqlCall call) {
final SqlOperator operator = call.getOperator();
if (operator == SqlStdOperatorTable.TABLESAMPLE) {
final String sampleName =
SqlLiteral.unchain(call.operand(0)).getValueAs(String.class);
@@ -3010,7 +2911,7 @@ public class SqlToRelConverter {
RelOptTable relOptTable =
RelOptTableImpl.create(
null, rowType, udf.getNameAsId().names, table,
expressionFunction);
- RelNode converted = toRel(relOptTable, ImmutableList.of(),
isTemporalJoinRightSide);
+ RelNode converted = toRel(relOptTable, ImmutableList.of());
bb.setRoot(converted, true);
return;
}
@@ -3047,22 +2948,15 @@ public class SqlToRelConverter {
final SqlSnapshot snapshot = (SqlSnapshot) call;
final RexNode period = bb.convertExpression(snapshot.getPeriod());
- // ----- FLINK MODIFICATION BEGIN -----
- boolean isTemporalJoin =
TemporalTableJoinUtil.isTemporalJoinSupportPeriod(period);
- // ----- FLINK MODIFICATION END -----
-
// convert inner query, could be a table name or a derived table
SqlNode expr = snapshot.getTableRef();
-
// ----- FLINK MODIFICATION BEGIN -----
SqlNode tableRef = snapshot.getTableRef();
// since we have reduced the period of SqlSnapshot in the validate
phase, we only need to
// check whether the period is a RexLiteral.
// in most cases, tableRef is a SqlBasicCall and the first operand is
a SqlIdentifier.
// when using SQL Hints, tableRef will be a SqlTableRef.
- if (isTemporalJoin) {
- convertTemporalJoinRightSide(bb, expr, Collections.emptyList());
- } else if (((tableRef instanceof SqlBasicCall
+ if (((tableRef instanceof SqlBasicCall
&& ((SqlBasicCall) tableRef).operand(0)
instanceof SqlIdentifier)
|| (tableRef instanceof SqlTableRef))
&& period instanceof RexLiteral) {
@@ -3086,7 +2980,7 @@ public class SqlToRelConverter {
? ((SqlBasicCall) tableRef).operand(0)
: ((SqlTableRef) tableRef).operand(0);
SchemaVersion schemaVersion =
TimestampSchemaVersion.of(timeTravelTimestamp);
- convertIdentifier(bb, sqlIdentifier, null, null, schemaVersion,
false);
+ convertIdentifier(bb, sqlIdentifier, null, null, schemaVersion);
} else {
convertFrom(bb, expr);
}
@@ -4099,12 +3993,8 @@ public class SqlToRelConverter {
return ViewExpanders.toRelContext(viewExpander, cluster, hints);
}
- // ----- FLINK MODIFICATION BEGIN -----
- public RelNode toRel(
- final RelOptTable table, final List<RelHint> hints, boolean
isTemporalJoinRightSide) {
+ public RelNode toRel(final RelOptTable table, final List<RelHint> hints) {
final RelNode scan = table.toRel(createToRelContext(hints));
- validateScan(scan, isTemporalJoinRightSide);
- // ----- FLINK MODIFICATION END -----
final InitializerExpressionFactory ief =
table.maybeUnwrap(InitializerExpressionFactory.class)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java
index 1c0585bef6e..38b5c705b0d 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java
@@ -18,11 +18,7 @@
package org.apache.flink.table.planner.plan.utils;
-import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType;
-
import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexCorrelVariable;
-import org.apache.calcite.rex.RexFieldAccess;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexVisitor;
import org.apache.calcite.rex.RexVisitorImpl;
@@ -70,14 +66,4 @@ public class TemporalTableJoinUtil {
return call.getOperator() == TemporalJoinUtil.TEMPORAL_JOIN_CONDITION()
&& call.operands.size() == 5;
}
-
- public static boolean isTemporalJoinSupportPeriod(RexNode period) {
- // it should be left table's field and is a time attribute
- if (period instanceof RexFieldAccess) {
- RexFieldAccess rexFieldAccess = (RexFieldAccess) period;
- return rexFieldAccess.getType() instanceof TimeIndicatorRelDataType
- && rexFieldAccess.getReferenceExpr() instanceof
RexCorrelVariable;
- }
- return false;
- }
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/common/CommonTemporalTableJoinRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/common/CommonTemporalTableJoinRule.scala
index b876b4d4b9b..a755dfeeb2e 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/common/CommonTemporalTableJoinRule.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/common/CommonTemporalTableJoinRule.scala
@@ -22,13 +22,13 @@ import
org.apache.flink.table.connector.source.LookupTableSource
import
org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalLegacyTableSourceScan,
FlinkLogicalRel, FlinkLogicalSnapshot, FlinkLogicalTableSourceScan}
import
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalLookupJoin,
StreamPhysicalTemporalJoin}
import org.apache.flink.table.planner.plan.schema.{LegacyTableSourceTable,
TableSourceTable, TimeIndicatorRelDataType}
-import org.apache.flink.table.planner.plan.utils.TemporalTableJoinUtil
import org.apache.flink.table.sources.LookupableTableSource
import org.apache.calcite.plan.hep.HepRelVertex
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rel.logical.{LogicalProject, LogicalTableScan}
+import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess}
/**
* Base implementation that matches temporal join node.
@@ -42,12 +42,15 @@ trait CommonTemporalTableJoinRule {
protected def matches(snapshot: FlinkLogicalSnapshot): Boolean = {
// period specification check
- val isTemporalJoinSnapshot =
- TemporalTableJoinUtil.isTemporalJoinSupportPeriod(snapshot.getPeriod)
- if (!isTemporalJoinSnapshot) {
- throw new TableException(
- "Temporal table join currently only supports " +
- "'FOR SYSTEM_TIME AS OF' left table's time attribute field.")
+ snapshot.getPeriod match {
+ // it should be left table's field and is a time attribute
+ case r: RexFieldAccess
+ if r.getType.isInstanceOf[TimeIndicatorRelDataType] &&
+ r.getReferenceExpr.isInstanceOf[RexCorrelVariable] => // pass
+ case _ =>
+ throw new TableException(
+ "Temporal table join currently only supports " +
+ "'FOR SYSTEM_TIME AS OF' left table's time attribute field.")
}
true
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index 14aafa094e5..88fd58c8fdc 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -1117,17 +1117,15 @@ class CatalogTableITCase(isStreamingMode: Boolean)
extends AbstractTestBase {
@Test
def testCreateViewAndShowCreateTable(): Unit = {
- val isBounded = !isStreamingMode
val createTableDDL =
- s""" |create table `source` (
- | `id` bigint not null,
- | `group` string not null,
- | `score` double
- |) with (
- | 'connector' = 'source-only',
- | 'bounded' = '$isBounded'
- |)
- |""".stripMargin
+ """ |create table `source` (
+ | `id` bigint not null,
+ | `group` string not null,
+ | `score` double
+ |) with (
+ | 'connector' = 'source-only'
+ |)
+ |""".stripMargin
val createViewDDL =
""" |create view `tmp` as
|select `group`, avg(`score`) as avg_score
@@ -1146,15 +1144,13 @@ class CatalogTableITCase(isStreamingMode: Boolean)
extends AbstractTestBase {
@Test
def testAlterViewRename(): Unit = {
- val isBounded = !isStreamingMode
- tableEnv.executeSql(s"""
- | CREATE TABLE T (
- | id INT
- | ) WITH (
- | 'connector' = 'source-only',
- | 'bounded' = '$isBounded'
- | )
- |""".stripMargin)
+ tableEnv.executeSql("""
+ | CREATE TABLE T (
+ | id INT
+ | ) WITH (
+ | 'connector' = 'source-only'
+ | )
+ |""".stripMargin)
tableEnv.executeSql("CREATE VIEW V AS SELECT * FROM T")
tableEnv.executeSql("ALTER VIEW V RENAME TO V2")
@@ -1163,16 +1159,14 @@ class CatalogTableITCase(isStreamingMode: Boolean)
extends AbstractTestBase {
@Test
def testAlterViewAs(): Unit = {
- val isBounded = !isStreamingMode
- tableEnv.executeSql(s"""
- | CREATE TABLE T (
- | a INT,
- | b INT
- | ) WITH (
- | 'connector' = 'source-only',
- | 'bounded' = '$isBounded'
- | )
- |""".stripMargin)
+ tableEnv.executeSql("""
+ | CREATE TABLE T (
+ | a INT,
+ | b INT
+ | ) WITH (
+ | 'connector' = 'source-only'
+ | )
+ |""".stripMargin)
tableEnv.executeSql("CREATE VIEW V AS SELECT a FROM T")
tableEnv.executeSql("ALTER VIEW V AS SELECT b FROM T")
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
index eebafbeecd3..d228c206aa5 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
@@ -731,12 +731,8 @@ class TableScanTest extends TableTestBase {
| 'table-source-class' =
'${classOf[MockedLookupTableSource].getName}'
|)
""".stripMargin)
- thrown.expect(classOf[ValidationException])
- thrown.expectMessage(
- "The specified table source `default_catalog`.`default_database`.`src` "
+
- "doesn't extend ScanTableSource and can not be used as the scan
source.\nHint: You can read the data " +
- "from the source as a dim table with the look up join syntax.
Otherwise, please refer to the document " +
- "and change the type of the connector to a source table that supports
direct reads.")
+ thrown.expect(classOf[TableException])
+ thrown.expectMessage("Cannot generate a valid execution plan for the given
query")
util.verifyRelPlan("SELECT * FROM src", ExplainDetail.CHANGELOG_MODE)
}