[ 
https://issues.apache.org/jira/browse/FLINK-9329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-9329:
---------------------------
    Description: 
{code:java}
Examples:
KafkaTableSource source = Kafka010JsonTableSource.builder()
.withSchema(TableSchema.builder()
.field("sensorId", Types.LONG()) 
.field("temp", Types.DOUBLE())
.field("ptime", Types.SQL_TIMESTAMP()).build())
.withProctimeAttribute("ptime")
.build(); tableEnv.registerTableSource("flights", source ); {code}
{{ }}
{code:java}
Kafka010JsonTableSource implement the DefinedRowtimeAttributes .
so when TableSourceUtil call hasRowtimeAttribute(source)to check ,it will call 
follow code
/** Returns a list with all rowtime attribute names of the [[TableSource]]. */
private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] = {
  tableSource match {
    case r: DefinedRowtimeAttributes =>
      r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray
    case _ =>
      Array()
  }
}

 r.getRowtimeAttributeDescriptors will throw NPE because of we use 
ProctimeAttribute here

{code}

  was:
{code:java}
KafkaTableSource source = Kafka010JsonTableSource.builder()
.withSchema(TableSchema.builder()
.field("sensorId", Types.LONG()) 
.field("temp", Types.DOUBLE())
.field("ptime", Types.SQL_TIMESTAMP()).build())
.withProctimeAttribute("ptime")
.build(); tableEnv.registerTableSource("flights", source ); {code}
{{ }}
{code:java}
Kafka010JsonTableSource implement the DefinedRowtimeAttributes when 
TableSourceUtil.hasRowtimeAttribute(soource) will call 
/** Returns a list with all rowtime attribute names of the [[TableSource]]. */
private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] = {
  tableSource match {
    case r: DefinedRowtimeAttributes =>
      r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray
    case _ =>
      Array()
  }
}

 r.getRowtimeAttributeDescriptors will throw NPE because of we use 
ProctimeAttribute here

{code}


> hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table 
> source
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-9329
>                 URL: https://issues.apache.org/jira/browse/FLINK-9329
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: yuemeng
>            Assignee: yuemeng
>            Priority: Critical
>
> {code:java}
> Examples:
> KafkaTableSource source = Kafka010JsonTableSource.builder()
> .withSchema(TableSchema.builder()
> .field("sensorId", Types.LONG()) 
> .field("temp", Types.DOUBLE())
> .field("ptime", Types.SQL_TIMESTAMP()).build())
> .withProctimeAttribute("ptime")
> .build(); tableEnv.registerTableSource("flights", source ); {code}
> {{ }}
> {code:java}
> Kafka010JsonTableSource implement the DefinedRowtimeAttributes .
> so when TableSourceUtil call hasRowtimeAttribute(source)to check ,it will 
> call follow code
> /** Returns a list with all rowtime attribute names of the [[TableSource]]. */
> private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] 
> = {
>   tableSource match {
>     case r: DefinedRowtimeAttributes =>
>       r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray
>     case _ =>
>       Array()
>   }
> }
>  r.getRowtimeAttributeDescriptors will throw NPE because of we use 
> ProctimeAttribute here
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to