[ 
https://issues.apache.org/jira/browse/FLINK-15935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-15935:
-------------------------------
    Description: 
Run the following code in module `flink-examples-table` (must be under this 
module)
{code:java}
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.table.examples.java;


import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

/**
 * javadoc.
 */
public class TableApiExample {

   /**
    *
    * @param args
    * @throws Exception
    */
   public static void main(String[] args) throws Exception {

      StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
      bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
      StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);
      bsTableEnv.sqlUpdate( "CREATE TABLE sink_kafka (\n" +
         "    status  STRING,\n" +
         "    direction STRING,\n" +
         "    event_ts TIMESTAMP(3),\n" +
         "    WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND\n" +
         ") WITH (\n" +
         "  'connector.type' = 'kafka',       \n" +
         "  'connector.version' = 'universal',    \n" +
         "  'connector.topic' = 'generated.events2',\n" +
         "  'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
         "  'connector.properties.bootstrap.servers' = 'localhost:9092',\n" +
         "  'connector.properties.group.id' = 'testGroup',\n" +
         "  'format.type'='json',\n" +
         "  'update-mode' = 'append'\n" +
         ")\n");

   }
}
 {code}
 

And hit the following error:
{code:java}
Exception in thread "main" org.apache.calcite.runtime.CalciteContextException: 
From line 5, column 31 to line 5, column 38: Unknown identifier 
'event_ts'Exception in thread "main" 
org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to 
line 5, column 38: Unknown identifier 'event_ts' at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at 
org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) at 
org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
 at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
 at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:501) at 
org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144) 
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
 at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:947)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:930)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.lambda$createTableSchema$8(SqlToOperationConverter.java:509)
 at java.util.Optional.ifPresent(Optional.java:159) at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:505)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)
 at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) 
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
 at 
org.apache.flink.table.examples.java.TableApiExample.main(TableApiExample.java:43)Caused
 by: org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 
'event_ts' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
Method) at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at 
org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) ... 25 more 
{code}

  was:
Run the following code in module `flink-table-examples` (must be under this 
module)
{code:java}

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.table.examples.java;


import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

/**
 * javadoc.
 */
public class TableApiExample {

   /**
    *
    * @param args
    * @throws Exception
    */
   public static void main(String[] args) throws Exception {

      StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
      bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
      StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);
      bsTableEnv.sqlUpdate( "CREATE TABLE sink_kafka (\n" +
         "    status  STRING,\n" +
         "    direction STRING,\n" +
         "    event_ts TIMESTAMP(3),\n" +
         "    WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND\n" +
         ") WITH (\n" +
         "  'connector.type' = 'kafka',       \n" +
         "  'connector.version' = 'universal',    \n" +
         "  'connector.topic' = 'generated.events2',\n" +
         "  'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
         "  'connector.properties.bootstrap.servers' = 'localhost:9092',\n" +
         "  'connector.properties.group.id' = 'testGroup',\n" +
         "  'format.type'='json',\n" +
         "  'update-mode' = 'append'\n" +
         ")\n");

   }
}
 {code}
 

And hit the following error:
{code:java}

Exception in thread "main" org.apache.calcite.runtime.CalciteContextException: 
From line 5, column 31 to line 5, column 38: Unknown identifier 
'event_ts'Exception in thread "main" 
org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to 
line 5, column 38: Unknown identifier 'event_ts' at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at 
org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) at 
org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
 at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
 at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:501) at 
org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144) 
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
 at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:947)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:930)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.lambda$createTableSchema$8(SqlToOperationConverter.java:509)
 at java.util.Optional.ifPresent(Optional.java:159) at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:505)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)
 at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) 
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
 at 
org.apache.flink.table.examples.java.TableApiExample.main(TableApiExample.java:43)Caused
 by: org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 
'event_ts' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
Method) at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at 
org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) ... 25 more 
{code}


> Unable to use watermark when depends both on flink planner and blink planner
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-15935
>                 URL: https://issues.apache.org/jira/browse/FLINK-15935
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.10.0
>            Reporter: Jeff Zhang
>            Priority: Blocker
>
> Run the following code in module `flink-examples-table` (must be under this 
> module)
> {code:java}
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  *
>  *     http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> package org.apache.flink.table.examples.java;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> /**
>  * javadoc.
>  */
> public class TableApiExample {
>    /**
>     *
>     * @param args
>     * @throws Exception
>     */
>    public static void main(String[] args) throws Exception {
>       StreamExecutionEnvironment bsEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>       bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>       EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>       StreamTableEnvironment bsTableEnv = 
> StreamTableEnvironment.create(bsEnv, bsSettings);
>       bsTableEnv.sqlUpdate( "CREATE TABLE sink_kafka (\n" +
>          "    status  STRING,\n" +
>          "    direction STRING,\n" +
>          "    event_ts TIMESTAMP(3),\n" +
>          "    WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND\n" +
>          ") WITH (\n" +
>          "  'connector.type' = 'kafka',       \n" +
>          "  'connector.version' = 'universal',    \n" +
>          "  'connector.topic' = 'generated.events2',\n" +
>          "  'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
>          "  'connector.properties.bootstrap.servers' = 'localhost:9092',\n" +
>          "  'connector.properties.group.id' = 'testGroup',\n" +
>          "  'format.type'='json',\n" +
>          "  'update-mode' = 'append'\n" +
>          ")\n");
>    }
> }
>  {code}
>  
> And hit the following error:
> {code:java}
> Exception in thread "main" 
> org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to 
> line 5, column 38: Unknown identifier 'event_ts'Exception in thread "main" 
> org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to 
> line 5, column 38: Unknown identifier 'event_ts' at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) 
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) at 
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
>  at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
>  at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:501) at 
> org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
>  at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:947)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:930)
>  at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.lambda$createTableSchema$8(SqlToOperationConverter.java:509)
>  at java.util.Optional.ifPresent(Optional.java:159) at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:505)
>  at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177)
>  at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)
>  at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>  at 
> org.apache.flink.table.examples.java.TableApiExample.main(TableApiExample.java:43)Caused
>  by: org.apache.calcite.sql.validate.SqlValidatorException: Unknown 
> identifier 'event_ts' at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) 
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) ... 25 
> more {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to