Hi Jeffrey,

You are right and I understood what you have  said  after I just studied the 
class org.apache.flink.util.SerializedThrowable.
I prefer the fixes no.2 you mentioned:
        CheckpointException should always capture its wrapped exception as a 
SerializedThrowable 
Looking forward to seeing your pr soon :)

Best,
Terry Wang



> 在 2019年9月23日,上午11:48,Jeffrey Martin <jeffrey.martin...@gmail.com> 写道:
> 
> Hi Terry,
> 
> KafkaException comes in through the job's dependencies (it's defined in the
> kafka-clients jar packed up in the fat job jar) and is on either the TM nor
> JM default classpath. The job running in the TM includes the job
> dependencies and so can throw a KafkaException but the JM can't deserialize
> it because it's not available on the default classpath.
> 
> I'm suggesting defensively wrapping all causes of a CheckpointException in
> a SerializedThrowable (in addition to defensively wrapping everything
> except a CheckpointException). I believe SerializedThrowable is there
> specifically for this case, i.e. where a job in the TM sends the JM an
> exception that's defined only in the job itself.
> 
> It might be clearer if I just put up a PR :) I'd be happy to and it'll be
> very short.
> 
> Best,
> 
> Jeff
> 
> On Sun, Sep 22, 2019 at 7:45 PM Terry Wang <zjuwa...@gmail.com> wrote:
> 
>> Hi, Jeffrey~
>> 
>> Thanks for your detailed explanation and I understood why job failed with
>> flink 1.9.
>> 
>> But the two fixes you mentioned may still not work well. As KafkaException
>> can be serialized
>> in TM for there is necessary jar in its classpath but not in JM, so maybe
>> it’s impossible to check
>> the possibility of serialization in advance.
>> Do I understand right?
>> 
>> 
>> 
>> Best,
>> Terry Wang
>> 
>> 
>> 
>>> 在 2019年9月23日,上午5:17,Jeffrey Martin <jeffrey.martin...@gmail.com> 写道:
>>> 
>>> Thanks for suggestion, Terry. I've investigated a bit further.
>>> 
>>> DeclineCheckpoint specifically checks for the possibility of an exception
>>> that the JM won't be able to deserialize (i.e. anything other than a
>>> Checkpoint exception). It just doesn't check for the possibility of a
>>> CheckpointException that can't be deserialize because its root cause
>> can't
>>> be deserialize.
>>> 
>>> I think the job succeeding on 1.8 and failing on 1.9 was a red herring --
>>> 1.9 broke the FlinkKafkaProducer API so I wound up having to set the
>>> Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused
>>> checkpoints to fail sometimes. That caused the KafkaException to be
>>> propagated to the JM as the root cause of a CheckpointException.
>>> 
>>> On Sun, Sep 22, 2019 at 5:03 AM Terry Wang <zjuwa...@gmail.com> wrote:
>>> 
>>>> Hi, Jeffrey~
>>>> 
>>>> I think two fixes you mentioned may not work in your case.
>>>> This problem https://issues.apache.org/jira/browse/FLINK-14076 <
>>>> https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and
>> JM
>>>> jar package environment inconsistent or jar loaded behavior
>> inconsistent in
>>>> nature.
>>>> Maybe the behavior  of standalone cluster’s dynamic class loader changed
>>>> in flink 1.9 since you mentioned that your program run normally in flink
>>>> 1.8.
>>>> Just a thought from me.
>>>> Hope to be useful~
>>>> 
>>>> Best,
>>>> Terry Wang
>>>> 
>>>> 
>>>> 
>>>>> 在 2019年9月21日,上午2:58,Jeffrey Martin <jeffrey.martin...@gmail.com> 写道:
>>>>> 
>>>>> JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076
>>>>> 
>>>>> I'm on Flink v1.9 with the Kafka connector and a standalone JM.
>>>>> 
>>>>> If FlinkKafkaProducer fails while checkpointing, it throws a
>>>> KafkaException
>>>>> which gets wrapped in a CheckpointException which is sent to the JM as
>> a
>>>>> DeclineCheckpoint. KafkaException isn't on the JM default classpath, so
>>>> the
>>>>> JM throws a fairly cryptic ClassNotFoundException. The details of the
>>>>> KafkaException wind up suppressed so it's impossible to figure out what
>>>>> actually went wrong.
>>>>> 
>>>>> I can think of two fixes that would prevent this from occurring in the
>>>>> Kafka or other connectors in the future:
>>>>> 1. DeclineCheckpoint should always send a SerializedThrowable to the JM
>>>>> rather than allowing CheckpointExceptions with non-deserializable root
>>>>> causes to slip through
>>>>> 2. CheckpointException should always capture its wrapped exception as a
>>>>> SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))'
>>>>> rather than 'super(cause)').
>>>>> 
>>>>> Thoughts?
>>>> 
>>>> 
>> 
>> 

Reply via email to