Xianxun Ye created FLINK-20181:
----------------------------------

             Summary: RowData cannot cast to Tuple2
                 Key: FLINK-20181
                 URL: https://issues.apache.org/jira/browse/FLINK-20181
             Project: Flink
          Issue Type: Bug
            Reporter: Xianxun Ye


I want to emit CDC data by my own StreamOperator.

flink version :1.11.2, blink planner.
{code:java}

//代码占位符
  getTableEnv().registerTableSource(
        "source",
        new StreamTableSource<RowData>() {
          TableSchema tableSchema = TableSchema.builder()
              .field("id", new AtomicDataType(new IntType(false)))
              .field("name", DataTypes.STRING())
              .field("type", DataTypes.STRING())
              .primaryKey("id")
              .build();          @Override
          public DataStream<RowData> getDataStream(StreamExecutionEnvironment 
execEnv) {
            return execEnv.addSource(new 
DebugSourceFunction(tableSchema.toRowDataType()));
          }          @Override
          public TableSchema getTableSchema() {
            return tableSchema;
          }          @Override
          public DataType getProducedDataType() {
            return getTableSchema().toRowDataType().bridgedTo(RowData.class);
          }
        }
    );
    sql("insert into Test.testdb.animal "
            + " SELECT id, name, type, '2020' as da, '11' as hr"
            + " from source"
    );  

class DebugSourceFunction extends RichParallelSourceFunction<RowData> 
implements ResultTypeQueryable<RowData> {    DataType dataType;    public 
DebugSourceFunction(DataType dataType) {
      this.dataType = dataType;
    }    @Override
    public TypeInformation<RowData> getProducedType() {
      return (TypeInformation<RowData>) createTypeInformation(dataType);
    }    @Override
    public void run(SourceContext<RowData> ctx) throws Exception {
      ctx.collect(GenericRowData.ofKind(RowKind.INSERT, 1, 
StringData.fromString("monkey"), StringData.fromString("small")));
    }    @Override
    public void cancel() {    }    public TypeInformation<?> 
createTypeInformation(DataType producedDataType) {
      final DataType internalDataType = DataTypeUtils.transform(
          producedDataType,
          TypeTransformations.TO_INTERNAL_CLASS);
      return fromDataTypeToTypeInfo(internalDataType);
    }
  }  

public class TestUpsertTableSink implements UpsertStreamTableSink<RowData>, 
OverwritableTableSink, PartitionableTableSink {
     @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, 
RowData>> dataStream) {
      
      DataStream<Void> returnStream = dataStream
          .map(
              (MapFunction<Tuple2<Boolean, RowData>, RowData>)
                  value -> value.f1
          )
          ......      return returnStream
          .addSink(new DiscardingSink<>())
          .setParallelism(1);
    }
  }
{code}
when I execute sql with `insert into ...`, occurs class cast fail exception:
{code:java}
//代码占位符
Caused by: java.lang.ClassCastException: 
org.apache.flink.table.data.GenericRowData cannot be cast to 
org.apache.flink.api.java.tuple.Tuple2Caused by: java.lang.ClassCastException: 
org.apache.flink.table.data.GenericRowData cannot be cast to 
org.apache.flink.api.java.tuple.Tuple2 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 at StreamExecCalc$8.processElement(Unknown Source) at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to