Hi Terry,

KafkaException comes in through the job's dependencies (it's defined in the
kafka-clients jar packed up in the fat job jar) and is on either the TM nor
JM default classpath. The job running in the TM includes the job
dependencies and so can throw a KafkaException but the JM can't deserialize
it because it's not available on the default classpath.

I'm suggesting defensively wrapping all causes of a CheckpointException in
a SerializedThrowable (in addition to defensively wrapping everything
except a CheckpointException). I believe SerializedThrowable is there
specifically for this case, i.e. where a job in the TM sends the JM an
exception that's defined only in the job itself.

It might be clearer if I just put up a PR :) I'd be happy to and it'll be
very short.

Best,

Jeff

On Sun, Sep 22, 2019 at 7:45 PM Terry Wang <zjuwa...@gmail.com> wrote:

> Hi, Jeffrey~
>
> Thanks for your detailed explanation and I understood why job failed with
> flink 1.9.
>
> But the two fixes you mentioned may still not work well. As KafkaException
> can be serialized
> in TM for there is necessary jar in its classpath but not in JM, so maybe
> it’s impossible to check
> the possibility of serialization in advance.
> Do I understand right?
>
>
>
> Best,
> Terry Wang
>
>
>
> > 在 2019年9月23日,上午5:17,Jeffrey Martin <jeffrey.martin...@gmail.com> 写道:
> >
> > Thanks for suggestion, Terry. I've investigated a bit further.
> >
> > DeclineCheckpoint specifically checks for the possibility of an exception
> > that the JM won't be able to deserialize (i.e. anything other than a
> > Checkpoint exception). It just doesn't check for the possibility of a
> > CheckpointException that can't be deserialize because its root cause
> can't
> > be deserialize.
> >
> > I think the job succeeding on 1.8 and failing on 1.9 was a red herring --
> > 1.9 broke the FlinkKafkaProducer API so I wound up having to set the
> > Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused
> > checkpoints to fail sometimes. That caused the KafkaException to be
> > propagated to the JM as the root cause of a CheckpointException.
> >
> > On Sun, Sep 22, 2019 at 5:03 AM Terry Wang <zjuwa...@gmail.com> wrote:
> >
> >> Hi, Jeffrey~
> >>
> >> I think two fixes you mentioned may not work in your case.
> >> This problem https://issues.apache.org/jira/browse/FLINK-14076 <
> >> https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and
> JM
> >> jar package environment inconsistent or jar loaded behavior
> inconsistent in
> >> nature.
> >> Maybe the behavior  of standalone cluster’s dynamic class loader changed
> >> in flink 1.9 since you mentioned that your program run normally in flink
> >> 1.8.
> >> Just a thought from me.
> >> Hope to be useful~
> >>
> >> Best,
> >> Terry Wang
> >>
> >>
> >>
> >>> 在 2019年9月21日,上午2:58,Jeffrey Martin <jeffrey.martin...@gmail.com> 写道:
> >>>
> >>> JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076
> >>>
> >>> I'm on Flink v1.9 with the Kafka connector and a standalone JM.
> >>>
> >>> If FlinkKafkaProducer fails while checkpointing, it throws a
> >> KafkaException
> >>> which gets wrapped in a CheckpointException which is sent to the JM as
> a
> >>> DeclineCheckpoint. KafkaException isn't on the JM default classpath, so
> >> the
> >>> JM throws a fairly cryptic ClassNotFoundException. The details of the
> >>> KafkaException wind up suppressed so it's impossible to figure out what
> >>> actually went wrong.
> >>>
> >>> I can think of two fixes that would prevent this from occurring in the
> >>> Kafka or other connectors in the future:
> >>> 1. DeclineCheckpoint should always send a SerializedThrowable to the JM
> >>> rather than allowing CheckpointExceptions with non-deserializable root
> >>> causes to slip through
> >>> 2. CheckpointException should always capture its wrapped exception as a
> >>> SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))'
> >>> rather than 'super(cause)').
> >>>
> >>> Thoughts?
> >>
> >>
>
>

Reply via email to