Hi, Jeffrey~

Thanks for your detailed explanation and I understood why job failed with flink 
1.9.

But the two fixes you mentioned may still not work well. As KafkaException can 
be serialized 
in TM for there is necessary jar in its classpath but not in JM, so maybe it’s 
impossible to check
the possibility of serialization in advance. 
Do I understand right?



Best,
Terry Wang



> 在 2019年9月23日,上午5:17,Jeffrey Martin <jeffrey.martin...@gmail.com> 写道:
> 
> Thanks for suggestion, Terry. I've investigated a bit further.
> 
> DeclineCheckpoint specifically checks for the possibility of an exception
> that the JM won't be able to deserialize (i.e. anything other than a
> Checkpoint exception). It just doesn't check for the possibility of a
> CheckpointException that can't be deserialize because its root cause can't
> be deserialize.
> 
> I think the job succeeding on 1.8 and failing on 1.9 was a red herring --
> 1.9 broke the FlinkKafkaProducer API so I wound up having to set the
> Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused
> checkpoints to fail sometimes. That caused the KafkaException to be
> propagated to the JM as the root cause of a CheckpointException.
> 
> On Sun, Sep 22, 2019 at 5:03 AM Terry Wang <zjuwa...@gmail.com> wrote:
> 
>> Hi, Jeffrey~
>> 
>> I think two fixes you mentioned may not work in your case.
>> This problem https://issues.apache.org/jira/browse/FLINK-14076 <
>> https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and JM
>> jar package environment inconsistent or jar loaded behavior inconsistent in
>> nature.
>> Maybe the behavior  of standalone cluster’s dynamic class loader changed
>> in flink 1.9 since you mentioned that your program run normally in flink
>> 1.8.
>> Just a thought from me.
>> Hope to be useful~
>> 
>> Best,
>> Terry Wang
>> 
>> 
>> 
>>> 在 2019年9月21日,上午2:58,Jeffrey Martin <jeffrey.martin...@gmail.com> 写道:
>>> 
>>> JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076
>>> 
>>> I'm on Flink v1.9 with the Kafka connector and a standalone JM.
>>> 
>>> If FlinkKafkaProducer fails while checkpointing, it throws a
>> KafkaException
>>> which gets wrapped in a CheckpointException which is sent to the JM as a
>>> DeclineCheckpoint. KafkaException isn't on the JM default classpath, so
>> the
>>> JM throws a fairly cryptic ClassNotFoundException. The details of the
>>> KafkaException wind up suppressed so it's impossible to figure out what
>>> actually went wrong.
>>> 
>>> I can think of two fixes that would prevent this from occurring in the
>>> Kafka or other connectors in the future:
>>> 1. DeclineCheckpoint should always send a SerializedThrowable to the JM
>>> rather than allowing CheckpointExceptions with non-deserializable root
>>> causes to slip through
>>> 2. CheckpointException should always capture its wrapped exception as a
>>> SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))'
>>> rather than 'super(cause)').
>>> 
>>> Thoughts?
>> 
>> 

Reply via email to