ChenDonghe created FLINK-9025:
---------------------------------

             Summary: Problem with convert String to Row
                 Key: FLINK-9025
                 URL: https://issues.apache.org/jira/browse/FLINK-9025
             Project: Flink
          Issue Type: Bug
          Components: Java API
            Reporter: ChenDonghe


when we convert json string into Row, and set type of filed in the row, code 
below:
{code:java}
// code placeholder
DataStream<String> stream = env.addSource(consumer.getInstance(sourceTopic, new 
SimpleStringSchema()).setStartFromLatest());

DataStream<Row> dataStreamRow = stream.map(new 
ConvertDataStream()).returns(typeinfo);
{code}
this convert process calls the copy function in RowSerializer class, code below:

 
{code:java}
// public final class RowSerializer extends TypeSerializer<Row>

@Override
public Row copy(Row from) {
   int len = fieldSerializers.length;

   if (from.getArity() != len) {
      throw new RuntimeException("Row arity of from does not match 
serializers.");
   }

   Row result = new Row(len);
   for (int i = 0; i < len; i++) {
      Object fromField = from.getField(i);
      if (fromField != null) {
         Object copy = fieldSerializers[i].copy(fromField);
         result.setField(i, copy);
      } else {
         result.setField(i, null);
      }
   }
   return result;
}
{code}
the json string type message from kafka convert to the row type, in this 
process, RowSerrializer copy the from-row to a new result-row, but the type of 
result-row filed is Object type, for example, the first message from kafka 
filed0 is Integer type, the second message from kafka filed0 is Long type, if 
we set the filed0 is Long, we hope Integer can be compatible, but we got an 
exception : can not convert Interger to Long, so we hope RowSerrializer copy 
function can be more flexible, it can be act as a table, we can insert an 
Integer type value into Long type filed.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to