[ 
https://issues.apache.org/jira/browse/FLINK-14642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-14642:
--------------------------------
    Description: 
Currently, TupleSerializer and CaseCassSerializer do not support serialize NULL 
values, which I think is acceptable. But not supporting copy NULL values will 
cause the following codes to throw an exception, which I think is not matched 
with users' expectations and prone to error.

*codes:*
{code:java}
stream.map(xxx).filter(_ != null).xxx //the return type of the map function is 
Tuple and it may return null{code}
 

*exception info:*

 
{code:java}
Caused by: java.lang.NullPointerException 
  at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
 
  at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
 
  at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
{code}
 

*suggestion:*

Can we make the `copy` method of TupleSerializer/CaseClassSerializer to handle 
NULL values? e.g.
{code:java}
// org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy
def copy(from: T): T = {
  // handle NULL values.
  if(from == null) {
    return from
  }
  initArray()
  var i = 0
  while (i < arity) {
    fields(i) = 
fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef])
    i += 1
  }
  createInstance(fields)
}
{code}
 

 

 

 

 

  was:
Currently, TupleSerializer and CaseCassSerializer do not support serialize NULL 
values, which I think is acceptable. But not supporting to copy NULL values 
will cause the following codes to throw an exception, which I think is not 
matched with users' expectations.

*codes:*

 
{code:java}
stream.map(xxx).filter(_ != null).xxx //the return type of the map function is 
Tuple and it may return null{code}
 

*exception info:*

 
{code:java}
Caused by: java.lang.NullPointerExceptionCaused by: 
java.lang.NullPointerException at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
{code}
 

*suggestion:*

Can we make the `copy` method of TupleSerializer/CaseClassSerializer to handle 
NULL values? e.g.
{code:java}
// org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy
def copy(from: T): T = {
  // handle NULL values.
  if(from == null) {
    return from
  }
  initArray()
  var i = 0
  while (i < arity) {
    fields(i) = 
fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef])
    i += 1
  }
  createInstance(fields)
}
{code}
 

 

 

 

 


> Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-14642
>                 URL: https://issues.apache.org/jira/browse/FLINK-14642
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Type Serialization System
>    Affects Versions: 1.9.1
>            Reporter: Victor Wong
>            Priority: Major
>
> Currently, TupleSerializer and CaseCassSerializer do not support serialize 
> NULL values, which I think is acceptable. But not supporting copy NULL values 
> will cause the following codes to throw an exception, which I think is not 
> matched with users' expectations and prone to error.
> *codes:*
> {code:java}
> stream.map(xxx).filter(_ != null).xxx //the return type of the map function 
> is Tuple and it may return null{code}
>  
> *exception info:*
>  
> {code:java}
> Caused by: java.lang.NullPointerException 
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
>  
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
> {code}
>  
> *suggestion:*
> Can we make the `copy` method of TupleSerializer/CaseClassSerializer to 
> handle NULL values? e.g.
> {code:java}
> // org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy
> def copy(from: T): T = {
>   // handle NULL values.
>   if(from == null) {
>     return from
>   }
>   initArray()
>   var i = 0
>   while (i < arity) {
>     fields(i) = 
> fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef])
>     i += 1
>   }
>   createInstance(fields)
> }
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to