[
https://issues.apache.org/jira/browse/FLINK-20181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Timo Walther closed FLINK-20181.
--------------------------------
Resolution: Not A Problem
[~yesorno] {{RowData}} and {{StreamTableSource}} don't go well together. I
would recommend to use the new table source and sink interfaces that work
nicely with the new type system. Here you can find more documentation and
examples:
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html
> RowData cannot cast to Tuple2
> -----------------------------
>
> Key: FLINK-20181
> URL: https://issues.apache.org/jira/browse/FLINK-20181
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: Xianxun Ye
> Priority: Major
>
> I want to emit CDC data by my own StreamOperator.
> flink version :1.11.2, blink planner.
> {code:java}
> //代码占位符
> getTableEnv().registerTableSource(
> "source",
> new StreamTableSource<RowData>() {
> TableSchema tableSchema = TableSchema.builder()
> .field("id", new AtomicDataType(new IntType(false)))
> .field("name", DataTypes.STRING())
> .field("type", DataTypes.STRING())
> .primaryKey("id")
> .build(); @Override
> public DataStream<RowData> getDataStream(StreamExecutionEnvironment
> execEnv) {
> return execEnv.addSource(new
> DebugSourceFunction(tableSchema.toRowDataType()));
> } @Override
> public TableSchema getTableSchema() {
> return tableSchema;
> } @Override
> public DataType getProducedDataType() {
> return getTableSchema().toRowDataType().bridgedTo(RowData.class);
> }
> }
> );
> sql("insert into Test.testdb.animal "
> + " SELECT id, name, type, '2020' as da, '11' as hr"
> + " from source"
> );
> class DebugSourceFunction extends RichParallelSourceFunction<RowData>
> implements ResultTypeQueryable<RowData> {
> DataType dataType;
> public DebugSourceFunction(DataType dataType) {
> this.dataType = dataType;
> }
> @Override
> public TypeInformation<RowData> getProducedType() {
> return (TypeInformation<RowData>) createTypeInformation(dataType);
> }
> @Override
> public void run(SourceContext<RowData> ctx) throws Exception {
> ctx.collect(GenericRowData.ofKind(RowKind.INSERT, 1,
> StringData.fromString("monkey"), StringData.fromString("small")));
> }
> @Override
> public void cancel() { } public TypeInformation<?>
> createTypeInformation(DataType producedDataType) {
> final DataType internalDataType = DataTypeUtils.transform(
> producedDataType,
> TypeTransformations.TO_INTERNAL_CLASS);
> return fromDataTypeToTypeInfo(internalDataType);
> }
> }
> public class TestUpsertTableSink implements UpsertStreamTableSink<RowData>,
> OverwritableTableSink, PartitionableTableSink {
> @Override
> public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean,
> RowData>> dataStream) {
>
> DataStream<Void> returnStream = dataStream
> .map(
> (MapFunction<Tuple2<Boolean, RowData>, RowData>)
> value -> value.f1
> )
> ......
> return returnStream
> .addSink(new DiscardingSink<>())
> .setParallelism(1);
> }
> }
> {code}
> when I execute sql with `insert into ...`, occurs class cast fail exception:
> {code:java}
> //代码占位符
> Caused by: java.lang.ClassCastException:
> org.apache.flink.table.data.GenericRowData cannot be cast to
> org.apache.flink.api.java.tuple.Tuple2Caused by:
> java.lang.ClassCastException: org.apache.flink.table.data.GenericRowData
> cannot be cast to org.apache.flink.api.java.tuple.Tuple2 at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at StreamExecCalc$8.processElement(Unknown Source) at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)