[ 
https://issues.apache.org/jira/browse/FLINK-22968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17361370#comment-17361370
 ] 

Jark Wu commented on FLINK-22968:
---------------------------------

Hi [~DaChun777], thanks for reporting this, but this is not a bug. Flink does 
support consuming DataStream with atomic type, e.g. String. However, 
{{toAppendStream()}} only support {{Row}} or POJO as result type. This is 
claimed in the Javadoc. 

{code}
  /**
    * Converts the given [[Table]] into an append [[DataStream]] of a specified 
type.
    *
    * The [[Table]] must only have insert (append) changes. If the [[Table]] is 
also modified
    * by update or delete changes, the conversion will fail.
    *
    * The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
    * - [[Row]] and Scala Tuple types: Fields are mapped by position, field
    * types must match.
    * - POJO [[DataStream]] types: Fields are mapped by field name, field types 
must match.
    *
    * @param table The [[Table]] to convert.
    * @tparam T The type of the resulting [[DataStream]].
    * @return The converted [[DataStream]].
    */
  def toAppendStream[T: TypeInformation](table: Table): DataStream[T]
{code}

You can update your code to use Row as output type, 
{{tableEnvironment.toAppendStream[Row](res).print()}}, then the job should be 
able to run. 



>  String type cannot be used when creating Flink SQL
> ---------------------------------------------------
>
>                 Key: FLINK-22968
>                 URL: https://issues.apache.org/jira/browse/FLINK-22968
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.12.3, 1.13.1
>         Environment: {color:#FF0000}*Flink-1.13.1 and Flink-1.12.1*{color}
>            Reporter: DaChun
>            Priority: Blocker
>         Attachments: test.scala, this_error.txt
>
>
> When I run the program, the error is as follows:
> The specific error is in this_ error.txt,
> But the error prompts me to write an issue
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
> It's very easy to read a stream and convert it into a table mode. The generic 
> type is string
> Then an error is reported. The code is in test. Scala and the error is in 
> this_ error.txt
> But I try to create an entity class by myself, and then assign generics to 
> this entity class, and then I can pass it normally,
> Or with row type, do you have to create your own entity class, not with 
> string type?
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to