[
https://issues.apache.org/jira/browse/FLINK-15935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jark Wu reassigned FLINK-15935:
-------------------------------
Assignee: Jark Wu
> Unable to use watermark when depends both on flink planner and blink planner
> ----------------------------------------------------------------------------
>
> Key: FLINK-15935
> URL: https://issues.apache.org/jira/browse/FLINK-15935
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.10.0
> Reporter: Jeff Zhang
> Assignee: Jark Wu
> Priority: Blocker
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Run the following code in module `flink-examples-table` (must be under this
> module)
> {code:java}
> /*
> * Licensed to the Apache Software Foundation (ASF) under one
> * or more contributor license agreements. See the NOTICE file
> * distributed with this work for additional information
> * regarding copyright ownership. The ASF licenses this file
> * to you under the Apache License, Version 2.0 (the
> * "License"); you may not use this file except in compliance
> * with the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> package org.apache.flink.table.examples.java;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> /**
> * javadoc.
> */
> public class TableApiExample {
> /**
> *
> * @param args
> * @throws Exception
> */
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv =
> StreamTableEnvironment.create(bsEnv, bsSettings);
> bsTableEnv.sqlUpdate( "CREATE TABLE sink_kafka (\n" +
> " status STRING,\n" +
> " direction STRING,\n" +
> " event_ts TIMESTAMP(3),\n" +
> " WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND\n" +
> ") WITH (\n" +
> " 'connector.type' = 'kafka', \n" +
> " 'connector.version' = 'universal', \n" +
> " 'connector.topic' = 'generated.events2',\n" +
> " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
> " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" +
> " 'connector.properties.group.id' = 'testGroup',\n" +
> " 'format.type'='json',\n" +
> " 'update-mode' = 'append'\n" +
> ")\n");
> }
> }
> {code}
>
> And hit the following error:
> {code:java}
> Exception in thread "main"
> org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to
> line 5, column 38: Unknown identifier 'event_ts'Exception in thread "main"
> org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to
> line 5, column 38: Unknown identifier 'event_ts' at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) at
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
> at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
> at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:501) at
> org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:947)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:930)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.lambda$createTableSchema$8(SqlToOperationConverter.java:509)
> at java.util.Optional.ifPresent(Optional.java:159) at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:505)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> at
> org.apache.flink.table.examples.java.TableApiExample.main(TableApiExample.java:43)Caused
> by: org.apache.calcite.sql.validate.SqlValidatorException: Unknown
> identifier 'event_ts' at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) ... 25
> more {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)