[ 
https://issues.apache.org/jira/browse/FLINK-18961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177670#comment-17177670
 ] 

Chesnay Schepler commented on FLINK-18961:
------------------------------------------

This is indeed a bit un-intuitive, realistically however will not be changed 
since the DataSet API is about to be subsumed.

Note that if the map function were to not be chained your try-catch block would 
still not work.

Essentially, returning null anywhere is undefined-behavior, so I'd suggest to 
explicitly check for null in your map function.

>  In the case of FlatMap linking map, if map returns null, an exception will 
> be thrown in FlatMap
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18961
>                 URL: https://issues.apache.org/jira/browse/FLINK-18961
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataSet
>    Affects Versions: 1.11.0
>         Environment: Mac OS 10.13.6
> Kubernetes 1.16.8
> Flink 1.11.0
>            Reporter: Ryan
>            Priority: Minor
>         Attachments: Lark20200814-173817.png, Lark20200814-173821.png, 
> Lark20200814-173824.png
>
>
> I found a DateSet problem.  In the case of FlatMap linking map, if map 
> returns null, an exception will be thrown in FlatMap.I think it's a problem 
> with the operator chain.I will post a screenshot of the corresponding stack 
> call in the attachment.
> {code:java}
> text.filter(value -> value.f0.contains("any")).flatMap(new 
> FlatMapFunction<Tuple2<String, String>, String>() {
>               @Override
>               public void flatMap(Tuple2<String, String> value, 
> Collector<String> out) throws Exception {
>                   Pattern pattern = Pattern.compile("\".*\"");
>                   Matcher matcher = pattern.matcher(value.f0);
>                   if(matcher.find()){
>                       String match = matcher.group(0);
>                       out.collect(match); // here throw Exception
>                   }
>               }
>         }).map(value -> {
>             try {
>                 String jsonS = value.replace("\"\"","\"");
>                 jsonS = jsonS.substring(1,jsonS.length()-1);
>                 JSONObject json = JSONObject.parseObject(jsonS);
>                 String result = 
> json.getJSONObject("body").getJSONObject("message").getString("data");
>                 return result; // this is null 
>             }catch (Exception e){
>                 return value;
>             }
>         }).print();
> Caused by: java.lang.NullPointerException: The system does not support 
> records that are null. Null values are only supported as fields inside other 
> objects.Caused by: java.lang.NullPointerException: The system does not 
> support records that are null. Null values are only supported as fields 
> inside other objects. at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:76)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at com.lemonbox.Test$1.flatMap(Test.java:42) at 
> com.lemonbox.Test$1.flatMap(Test.java:35) at 
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:58)
>  at 
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:196)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
> java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to