Draft PR here: https://github.com/apache/flink/pull/9742
There might be some failing tests (still waiting on Travis), but I think
the diff is small enough for you to evaluate the approach for acceptability.

On Sun, Sep 22, 2019 at 9:10 PM Terry Wang <zjuwa...@gmail.com> wrote:

> Hi Jeffrey,
>
> You are right and I understood what you have  said  after I just studied
> the class org.apache.flink.util.SerializedThrowable.
> I prefer the fixes no.2 you mentioned:
>         CheckpointException should always capture its wrapped exception as
> a SerializedThrowable
> Looking forward to seeing your pr soon :)
>
> Best,
> Terry Wang
>
>
>
> > 在 2019年9月23日,上午11:48,Jeffrey Martin <jeffrey.martin...@gmail.com> 写道:
> >
> > Hi Terry,
> >
> > KafkaException comes in through the job's dependencies (it's defined in
> the
> > kafka-clients jar packed up in the fat job jar) and is on either the TM
> nor
> > JM default classpath. The job running in the TM includes the job
> > dependencies and so can throw a KafkaException but the JM can't
> deserialize
> > it because it's not available on the default classpath.
> >
> > I'm suggesting defensively wrapping all causes of a CheckpointException
> in
> > a SerializedThrowable (in addition to defensively wrapping everything
> > except a CheckpointException). I believe SerializedThrowable is there
> > specifically for this case, i.e. where a job in the TM sends the JM an
> > exception that's defined only in the job itself.
> >
> > It might be clearer if I just put up a PR :) I'd be happy to and it'll be
> > very short.
> >
> > Best,
> >
> > Jeff
> >
> > On Sun, Sep 22, 2019 at 7:45 PM Terry Wang <zjuwa...@gmail.com> wrote:
> >
> >> Hi, Jeffrey~
> >>
> >> Thanks for your detailed explanation and I understood why job failed
> with
> >> flink 1.9.
> >>
> >> But the two fixes you mentioned may still not work well. As
> KafkaException
> >> can be serialized
> >> in TM for there is necessary jar in its classpath but not in JM, so
> maybe
> >> it’s impossible to check
> >> the possibility of serialization in advance.
> >> Do I understand right?
> >>
> >>
> >>
> >> Best,
> >> Terry Wang
> >>
> >>
> >>
> >>> 在 2019年9月23日,上午5:17,Jeffrey Martin <jeffrey.martin...@gmail.com> 写道:
> >>>
> >>> Thanks for suggestion, Terry. I've investigated a bit further.
> >>>
> >>> DeclineCheckpoint specifically checks for the possibility of an
> exception
> >>> that the JM won't be able to deserialize (i.e. anything other than a
> >>> Checkpoint exception). It just doesn't check for the possibility of a
> >>> CheckpointException that can't be deserialize because its root cause
> >> can't
> >>> be deserialize.
> >>>
> >>> I think the job succeeding on 1.8 and failing on 1.9 was a red herring
> --
> >>> 1.9 broke the FlinkKafkaProducer API so I wound up having to set the
> >>> Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused
> >>> checkpoints to fail sometimes. That caused the KafkaException to be
> >>> propagated to the JM as the root cause of a CheckpointException.
> >>>
> >>> On Sun, Sep 22, 2019 at 5:03 AM Terry Wang <zjuwa...@gmail.com> wrote:
> >>>
> >>>> Hi, Jeffrey~
> >>>>
> >>>> I think two fixes you mentioned may not work in your case.
> >>>> This problem https://issues.apache.org/jira/browse/FLINK-14076 <
> >>>> https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM
> and
> >> JM
> >>>> jar package environment inconsistent or jar loaded behavior
> >> inconsistent in
> >>>> nature.
> >>>> Maybe the behavior  of standalone cluster’s dynamic class loader
> changed
> >>>> in flink 1.9 since you mentioned that your program run normally in
> flink
> >>>> 1.8.
> >>>> Just a thought from me.
> >>>> Hope to be useful~
> >>>>
> >>>> Best,
> >>>> Terry Wang
> >>>>
> >>>>
> >>>>
> >>>>> 在 2019年9月21日,上午2:58,Jeffrey Martin <jeffrey.martin...@gmail.com> 写道:
> >>>>>
> >>>>> JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076
> >>>>>
> >>>>> I'm on Flink v1.9 with the Kafka connector and a standalone JM.
> >>>>>
> >>>>> If FlinkKafkaProducer fails while checkpointing, it throws a
> >>>> KafkaException
> >>>>> which gets wrapped in a CheckpointException which is sent to the JM
> as
> >> a
> >>>>> DeclineCheckpoint. KafkaException isn't on the JM default classpath,
> so
> >>>> the
> >>>>> JM throws a fairly cryptic ClassNotFoundException. The details of the
> >>>>> KafkaException wind up suppressed so it's impossible to figure out
> what
> >>>>> actually went wrong.
> >>>>>
> >>>>> I can think of two fixes that would prevent this from occurring in
> the
> >>>>> Kafka or other connectors in the future:
> >>>>> 1. DeclineCheckpoint should always send a SerializedThrowable to the
> JM
> >>>>> rather than allowing CheckpointExceptions with non-deserializable
> root
> >>>>> causes to slip through
> >>>>> 2. CheckpointException should always capture its wrapped exception
> as a
> >>>>> SerializedThrowable (i.e., use 'super(new
> SerializedThrowable(cause))'
> >>>>> rather than 'super(cause)').
> >>>>>
> >>>>> Thoughts?
> >>>>
> >>>>
> >>
> >>
>
>

Reply via email to