[
https://issues.apache.org/jira/browse/FLINK-10380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-10380:
-----------------------------------
Labels: pull-request-available (was: )
> Check if key is not nul before assign to group in KeyedStream
> -------------------------------------------------------------
>
> Key: FLINK-10380
> URL: https://issues.apache.org/jira/browse/FLINK-10380
> Project: Flink
> Issue Type: Task
> Affects Versions: 1.6.0
> Reporter: Sayat Satybaldiyev
> Priority: Minor
> Labels: pull-request-available
>
> If a user creates a KeyedStream and partition by key which might be null,
> Flink job throws NullPointerExceptoin at runtime. However, NPE that Flink
> throws hard to debug and understand as it doesn't refer to place in Flink job.
> *Suggestion:*
> Add precondition that checks if the key is not null and throw a descriptive
> error if it's a null.
>
> *Job Example*:
>
> {code:java}
> DataStream<String> stream = env.fromCollection(Arrays.asList("aaa", "bbb"))
> .map(x -> (String)null)
> .keyBy(x -> x);{code}
>
>
> An error that is thrown:
>
> {code:java}
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException:
> java.lang.RuntimeException
> at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at org.myorg.quickstart.BuggyKeyedStream.main(BuggyKeyedStream.java:61)
> Caused by: java.lang.RuntimeException
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)16:26:43,110
> INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC
> service.
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)
> at
> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)
> at
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63)
> at
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
> at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> {code}
> ... 10 more
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)