[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15934071#comment-15934071
 ] 

Luke Hutchison commented on FLINK-6115:
---------------------------------------

It's quite easy to end up with null values in tuples though -- it's currently 
entirely valid to store values in tuples, they just can't be serialized -- and 
you can't always predict when tuples will be serialized -- e.g. (I suspect) 
code may work fine for a while in RAM on a single machine, but then when you 
scale your code up to run on a cluster, or even when code decides to spill to 
disk, suddenly it breaks. This is very poor behavior.

Even worse though is that it's exceedingly hard to tell where the problem is 
caused, as shown in the stack trace I posted. Not only is the location where 
the null value is set separated from the location where the problem is 
triggered on serialization, but the serialization trace doesn't tell you 
anything about where in the program the serializer was running, other than what 
operation type it was contained within.

Another common scenario in which null values get set in tuples is doing an 
outer join. Basically if the Flink policy is "we won't support nulls in tuples 
as valid, ever", then you should not be able to produce a tuple as a result of 
an outer join. More generally, you should simply throw an exception when the 
constructor of a tuple is called with a null parameter, so that the user is 
notified immediately of the invalid behavior, with the exception tied directly 
to where the null value setting happened. This would not be a perfect fix 
though, since the fields of a tuple are not final, so it is possible to simply 
set the field values to null directly.

I don't see any of these as good solutions to this issue. Really the best thing 
to do is find a way to efficiently serialize null values in tuples. Why exactly 
is it slower to support serializing null values in tuples than it is for a POJO 
or a {{Row}} object?

In theory, if you simply ran tuples through the POJO serializer, it should be 
able to serialize them fine, with the same efficiency that it can serialize 
regular POJOs (which are allowed to contain null values) -- so I don't see how 
or why this would incur a performance pentalty.

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-6115
>                 URL: https://issues.apache.org/jira/browse/FLINK-6115
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.2.0
>            Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>       at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>       at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>       at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
>       at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>       at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>       at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>       at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>       at 
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
>       at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>       at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>       at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only thing I can tell from this is that it happened somewhere in a 
> flatMap (but I have dozens of them in my code). Surely there's a way to pull 
> out the source file name and line number from the program DAG node when 
> errors like this occur?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to