Thanks for suggestion, Terry. I've investigated a bit further.

DeclineCheckpoint specifically checks for the possibility of an exception
that the JM won't be able to deserialize (i.e. anything other than a
Checkpoint exception). It just doesn't check for the possibility of a
CheckpointException that can't be deserialize because its root cause can't
be deserialize.

I think the job succeeding on 1.8 and failing on 1.9 was a red herring --
1.9 broke the FlinkKafkaProducer API so I wound up having to set the
Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused
checkpoints to fail sometimes. That caused the KafkaException to be
propagated to the JM as the root cause of a CheckpointException.

On Sun, Sep 22, 2019 at 5:03 AM Terry Wang <zjuwa...@gmail.com> wrote:

> Hi, Jeffrey~
>
> I think two fixes you mentioned may not work in your case.
> This problem https://issues.apache.org/jira/browse/FLINK-14076 <
> https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and JM
> jar package environment inconsistent or jar loaded behavior inconsistent in
> nature.
> Maybe the behavior  of standalone cluster’s dynamic class loader changed
> in flink 1.9 since you mentioned that your program run normally in flink
> 1.8.
> Just a thought from me.
> Hope to be useful~
>
> Best,
> Terry Wang
>
>
>
> > 在 2019年9月21日,上午2:58,Jeffrey Martin <jeffrey.martin...@gmail.com> 写道:
> >
> > JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076
> >
> > I'm on Flink v1.9 with the Kafka connector and a standalone JM.
> >
> > If FlinkKafkaProducer fails while checkpointing, it throws a
> KafkaException
> > which gets wrapped in a CheckpointException which is sent to the JM as a
> > DeclineCheckpoint. KafkaException isn't on the JM default classpath, so
> the
> > JM throws a fairly cryptic ClassNotFoundException. The details of the
> > KafkaException wind up suppressed so it's impossible to figure out what
> > actually went wrong.
> >
> > I can think of two fixes that would prevent this from occurring in the
> > Kafka or other connectors in the future:
> > 1. DeclineCheckpoint should always send a SerializedThrowable to the JM
> > rather than allowing CheckpointExceptions with non-deserializable root
> > causes to slip through
> > 2. CheckpointException should always capture its wrapped exception as a
> > SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))'
> > rather than 'super(cause)').
> >
> > Thoughts?
>
>

Reply via email to