[ 
https://issues.apache.org/jira/browse/FLINK-30448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17654893#comment-17654893
 ] 

Gen Luo commented on FLINK-30448:
---------------------------------

Hi Yong, 
I looked into the issue and found that this may be a limitation of Java lambda 
mechanism. 
{code:java}
class Test implements Serializable {

    static boolean nonNull(Object x) {
        return false;
    }

    public static void main(String[] args) throws Exception {
        Test a = new Test();
        FilterFunction f1 = Test::nonNull;
        FilterFunction f2 = Test::nonNull;
        System.out.println(f1 == f2);

        byte[] b1 = InstantiationUtil.serializeObject(f1);
        byte[] b2 = InstantiationUtil.serializeObject(f2);
        System.out.println(Arrays.equals(b1, b2));

        FilterFunction df1 =
                InstantiationUtil.deserializeObject(
                        b1, Thread.currentThread().getContextClassLoader());
        FilterFunction df2 =
                InstantiationUtil.deserializeObject(
                        b2, Thread.currentThread().getContextClassLoader());
        System.out.println(df1 == df2);
    }
}
{code}
Run the code above with the nonNull as a static method in Test class, the 
results are (false, true, true).
While if you make the nonNull a non-static method, the results will be (false, 
true, false).

After looked into the internal calls, I found that when the lambda is 
deserialized, there will be a SerializedLambda. In both case the 
SerializedLambda objects are different. While it is not the goal, the 
SerializedLambda should be converted to a FilterFunction. To achieve this, a 
method called "$deserializeLambda$" will be invoked with the SerializedLambda 
during the deserialization. After that, if the method is not static, the 
results will be different instances, while if the method is static, the results 
are exactly the same instance. So when flink call the filter instance with two 
different types of data(Child1 and Child2), the second call will cause 
ClassCastException.

On the other hand, if the FilterFunctions are initialized with code 
`x->x!=null`, there'll be two lambda classes, so that will be fine.

Unfortunately I can find little data about the $deserializeLambda$ method. 
Here's my supposition. Maybe it makes some optimization to return a singleton 
instance for a static method lambda. It works well for non-generic classes but 
fails here.

In one word, I suppose this is the limitation of Java lambda mechanism and 
Flink can do nothing to avoid it. You have to use `x->x!=null` pattern, or use 
a non-static method to avoid the exception.

> "filter(Objects::nonNull)" will bring down task with failure cause: 
> ClassCastException
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-30448
>                 URL: https://issues.apache.org/jira/browse/FLINK-30448
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>         Environment: test both on flink 1.15.1 and flink 1.16.0
> Intellij-Idea dev environment run
>            Reporter: Yong
>            Priority: Major
>         Attachments: TestSideOutput.java
>
>
> Attached an *all-in-one* java program, which can run locally in DEV 
> environment(e.g. IntelliJ IDEA->run), consuming from elements stream objects, 
> the object schema is a parent containing two childs(Child1 and Child2) 
> fields, I use *side-output* to map and split out two different sub-streams, 
> each for one child. I put '{*}filter(Objects:nonNUll){*}' for each sub-stream 
> to ignore null objects. When  comming from stream the parent record 
> {*}containing any one of child is null{*}, the program will bring down the 
> task and produce below error:
> ......
> switched from RUNNING to FAILED with failure cause: 
> java.lang.{*}ClassCastException{*}: mytest.TestSideOutput$Child2 cannot be 
> cast to mytest.TestSideOutput$Child1. Failed to push OutputTag with id 
> 'child2' to operator. This can occur when multiple OutputTags with different 
> types but identical names are being used.
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:88)
> ......
>  
> However, if I replace '{*}filter(Objects:nonNull){*}' (at line #71 and #90) 
> with logically equivalent  '{*}filter(x->x!=null){*}‘ (at line #70 and #89), 
> everythink will be OK.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to