[
https://issues.apache.org/jira/browse/FLINK-24956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Caizhi Weng closed FLINK-24956.
-------------------------------
Resolution: Duplicate
> SqlSnapshot throws NullPointerException when used in conjunction with CTE
> -------------------------------------------------------------------------
>
> Key: FLINK-24956
> URL: https://issues.apache.org/jira/browse/FLINK-24956
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.14.0, 1.13.3
> Reporter: Yuval Itzchakov
> Priority: Major
>
> Executing the following program will fail with a NullPointerException:
>
> {code:java}
> package foo.bar
> import org.apache.flink.api.scala.createTypeInformation
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.table.api.DataTypes
> import org.apache.flink.table.api.Schema
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> object Test {
> final case class Person(name: String, age: Int)
> def main(args: Array[String]): Unit = {
> val ee = StreamExecutionEnvironment.getExecutionEnvironment
> val te = StreamTableEnvironment.create(ee)
> val personSchema = Schema.newBuilder().column("name",
> DataTypes.STRING()).column("age", DataTypes.INT()).build()
> val x = ee.fromCollection(List(Person("a", 1)))
> te.createTemporaryView(
> "my_table",
> x,
> personSchema
> )
> val y = ee.fromCollection(List(Person("b", 2)))
> te.createTemporaryView(
> "my_table_2",
> y,
> personSchema
> )
> val res =
> te.executeSql("""
> |WITH A AS (
> | select name, age + 1 from my_table
> |),
> |B AS (
> | select name, age + 2 from my_table_2
> |)
> |
> |SELECT A.name, B.age
> |FROM A
> |JOIN B
> |FOR SYSTEM_TIME AS OF PROCTIME() on (A.name = B.name)
> |""".stripMargin)
> res.print()
> }
> }
> {code}
> Stacktrace:
> {code:java}
> Caused by: java.lang.NullPointerException
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSnapshot(SqlValidatorImpl.java:4714)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:986)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3085)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3133)
> at
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:117)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3076)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
> at
> org.apache.calcite.sql.validate.WithNamespace.validateImpl(WithNamespace.java:57)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWith(SqlValidatorImpl.java:3744)
> at org.apache.calcite.sql.SqlWith.validate(SqlWith.java:71)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:159)
> {code}
> The reason this fails is that SqlValidatorImpl, when validating the
> SqlSnapshot, always assumes it's operating on a node which has an underlying
> table directly:
> {code:java}
> if (!ns.getTable().isTemporal()) {
> List<String> qualifiedName = ns.getTable().getQualifiedName();
> String tableName = qualifiedName.get(qualifiedName.size() -
> 1);
> throw newValidationError(
> snapshot.getTableRef(),
> Static.RESOURCE.notTemporalTable(tableName));
> }
> {code}
> This is not always the case, as with CTE.
> A simple fix for this would be first checking `ns.getTable` agains't null,
> and only then checking it's temporality.
>
> The issue here is that this bug is inside Calcites validator.
> Would love some guidance on how to fix this issue.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)