[ 
https://issues.apache.org/jira/browse/FLINK-20181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-20181.
--------------------------------
    Resolution: Not A Problem

[~yesorno] {{RowData}} and {{StreamTableSource}} don't go well together. I 
would recommend to use the new table source and sink interfaces that work 
nicely with the new type system. Here you can find more documentation and 
examples:

https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html

> RowData cannot cast to Tuple2
> -----------------------------
>
>                 Key: FLINK-20181
>                 URL: https://issues.apache.org/jira/browse/FLINK-20181
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Xianxun Ye
>            Priority: Major
>
> I want to emit CDC data by my own StreamOperator.
> flink version :1.11.2, blink planner.
> {code:java}
> //代码占位符
>     getTableEnv().registerTableSource(
>         "source",
>         new StreamTableSource<RowData>() {
>           TableSchema tableSchema = TableSchema.builder()
>               .field("id", new AtomicDataType(new IntType(false)))
>               .field("name", DataTypes.STRING())
>               .field("type", DataTypes.STRING())
>               .primaryKey("id")
>               .build();          @Override
>           public DataStream<RowData> getDataStream(StreamExecutionEnvironment 
> execEnv) {
>             return execEnv.addSource(new 
> DebugSourceFunction(tableSchema.toRowDataType()));
>           }          @Override
>           public TableSchema getTableSchema() {
>             return tableSchema;
>           }          @Override
>           public DataType getProducedDataType() {
>             return getTableSchema().toRowDataType().bridgedTo(RowData.class);
>           }
>         }
>     );
>     sql("insert into Test.testdb.animal "
>             + " SELECT id, name, type, '2020' as da, '11' as hr"
>             + " from source"
>     );  
> class DebugSourceFunction extends RichParallelSourceFunction<RowData> 
> implements ResultTypeQueryable<RowData> {           
> DataType dataType;    
>     public DebugSourceFunction(DataType dataType) {
>       this.dataType = dataType;
>     }    
>     @Override
>     public TypeInformation<RowData> getProducedType() {
>       return (TypeInformation<RowData>) createTypeInformation(dataType);
>     }    
>     @Override
>     public void run(SourceContext<RowData> ctx) throws Exception {
>       ctx.collect(GenericRowData.ofKind(RowKind.INSERT, 1, 
> StringData.fromString("monkey"), StringData.fromString("small")));
>     }    
>     @Override
>     public void cancel() {    }    public TypeInformation<?> 
> createTypeInformation(DataType producedDataType) {
>       final DataType internalDataType = DataTypeUtils.transform(
>           producedDataType,
>           TypeTransformations.TO_INTERNAL_CLASS);
>       return fromDataTypeToTypeInfo(internalDataType);
>     }
>   }  
> public class TestUpsertTableSink implements UpsertStreamTableSink<RowData>, 
> OverwritableTableSink, PartitionableTableSink {
>      @Override
>     public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, 
> RowData>> dataStream) {
>       
>       DataStream<Void> returnStream = dataStream
>           .map(
>               (MapFunction<Tuple2<Boolean, RowData>, RowData>)
>                   value -> value.f1
>           )
>           ......      
>       return returnStream
>           .addSink(new DiscardingSink<>())
>           .setParallelism(1);
>     }
>   }
> {code}
> when I execute sql with `insert into ...`, occurs class cast fail exception:
> {code:java}
> //代码占位符
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.table.data.GenericRowData cannot be cast to 
> org.apache.flink.api.java.tuple.Tuple2Caused by: 
> java.lang.ClassCastException: org.apache.flink.table.data.GenericRowData 
> cannot be cast to org.apache.flink.api.java.tuple.Tuple2 at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>  at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>  at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>  at StreamExecCalc$8.processElement(Unknown Source) at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>  at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>  at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>  at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to