## Contribution Checklist
  
## What is the purpose of the change

This pull request checks add a is not null precondition to providing a better 
feedback for the user. Without it, Flink cluster throws obscure 
NullPointerException at runtime.

```
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: 
java.lang.RuntimeException
 at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
 at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
 at org.myorg.quickstart.BuggyKeyedStream.main(BuggyKeyedStream.java:61)
Caused by: java.lang.RuntimeException
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)16:26:43,110
 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC 
service.
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
 at 
org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)
 at 
org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)
 at 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63)
 at 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
 at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
```

## Brief changelog
  - *Add notNull precondition check for 
KeyGroupRangeAssignment#assignToKeyGroup*

## Verifying this change
This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

[ Full content available at: https://github.com/apache/flink/pull/6724 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to