Check for null in this place breaks current logic of CoGroup/Join classes. 
CoGroup has no checks for nulls directly in `evictor`/`trigger` methods and 
validates during delegation:
https://github.com/apache/flink/blob/98412a5f7227d7694c727847727f9434bcca4e92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L344

I do null check there as well for `allowedLateness` field.
In the same time, Join.apply delegates to CoGroup.apply:
https://github.com/apache/flink/blob/98412a5f7227d7694c727847727f9434bcca4e92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java#L314).
 

To be consistent, we also should add null checks for evictor and trigger. 
Adding null checks directly in setters/constructor breaks chain of calls in 
`apply` methods (for evictor/trigger/allowedLateness) and requires clumsy 
if-else conditions for each nullable field separately.

Both CoGroup and Join allow null for trigger/evictor (and I've added 
allowedLateness following the same approach) but don't pass validation during 
calls to `apply(...)`.

As a result of null check for `allowedLateness` inside setter, we have errors 
for the following scenario:
https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala#L151

It breaks when user doesn't specify any of trigger/evictor/allowedLateness. At 
the same time, these fields are optional and have defaults in WindowedStream. 
Unfortunately, default for allowedLateness in WindowedStream has private 
modificator (and I don't think it's a good practice to set default when user 
passed null by mistake).

Please, let me know you thoughts.

[ Full content available at: https://github.com/apache/flink/pull/6646 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to