Draft PR here: https://github.com/apache/flink/pull/9742 There might be some failing tests (still waiting on Travis), but I think the diff is small enough for you to evaluate the approach for acceptability.
On Sun, Sep 22, 2019 at 9:10 PM Terry Wang <zjuwa...@gmail.com> wrote: > Hi Jeffrey, > > You are right and I understood what you have said after I just studied > the class org.apache.flink.util.SerializedThrowable. > I prefer the fixes no.2 you mentioned: > CheckpointException should always capture its wrapped exception as > a SerializedThrowable > Looking forward to seeing your pr soon :) > > Best, > Terry Wang > > > > > 在 2019年9月23日,上午11:48,Jeffrey Martin <jeffrey.martin...@gmail.com> 写道: > > > > Hi Terry, > > > > KafkaException comes in through the job's dependencies (it's defined in > the > > kafka-clients jar packed up in the fat job jar) and is on either the TM > nor > > JM default classpath. The job running in the TM includes the job > > dependencies and so can throw a KafkaException but the JM can't > deserialize > > it because it's not available on the default classpath. > > > > I'm suggesting defensively wrapping all causes of a CheckpointException > in > > a SerializedThrowable (in addition to defensively wrapping everything > > except a CheckpointException). I believe SerializedThrowable is there > > specifically for this case, i.e. where a job in the TM sends the JM an > > exception that's defined only in the job itself. > > > > It might be clearer if I just put up a PR :) I'd be happy to and it'll be > > very short. > > > > Best, > > > > Jeff > > > > On Sun, Sep 22, 2019 at 7:45 PM Terry Wang <zjuwa...@gmail.com> wrote: > > > >> Hi, Jeffrey~ > >> > >> Thanks for your detailed explanation and I understood why job failed > with > >> flink 1.9. > >> > >> But the two fixes you mentioned may still not work well. As > KafkaException > >> can be serialized > >> in TM for there is necessary jar in its classpath but not in JM, so > maybe > >> it’s impossible to check > >> the possibility of serialization in advance. > >> Do I understand right? > >> > >> > >> > >> Best, > >> Terry Wang > >> > >> > >> > >>> 在 2019年9月23日,上午5:17,Jeffrey Martin <jeffrey.martin...@gmail.com> 写道: > >>> > >>> Thanks for suggestion, Terry. I've investigated a bit further. > >>> > >>> DeclineCheckpoint specifically checks for the possibility of an > exception > >>> that the JM won't be able to deserialize (i.e. anything other than a > >>> Checkpoint exception). It just doesn't check for the possibility of a > >>> CheckpointException that can't be deserialize because its root cause > >> can't > >>> be deserialize. > >>> > >>> I think the job succeeding on 1.8 and failing on 1.9 was a red herring > -- > >>> 1.9 broke the FlinkKafkaProducer API so I wound up having to set the > >>> Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused > >>> checkpoints to fail sometimes. That caused the KafkaException to be > >>> propagated to the JM as the root cause of a CheckpointException. > >>> > >>> On Sun, Sep 22, 2019 at 5:03 AM Terry Wang <zjuwa...@gmail.com> wrote: > >>> > >>>> Hi, Jeffrey~ > >>>> > >>>> I think two fixes you mentioned may not work in your case. > >>>> This problem https://issues.apache.org/jira/browse/FLINK-14076 < > >>>> https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM > and > >> JM > >>>> jar package environment inconsistent or jar loaded behavior > >> inconsistent in > >>>> nature. > >>>> Maybe the behavior of standalone cluster’s dynamic class loader > changed > >>>> in flink 1.9 since you mentioned that your program run normally in > flink > >>>> 1.8. > >>>> Just a thought from me. > >>>> Hope to be useful~ > >>>> > >>>> Best, > >>>> Terry Wang > >>>> > >>>> > >>>> > >>>>> 在 2019年9月21日,上午2:58,Jeffrey Martin <jeffrey.martin...@gmail.com> 写道: > >>>>> > >>>>> JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076 > >>>>> > >>>>> I'm on Flink v1.9 with the Kafka connector and a standalone JM. > >>>>> > >>>>> If FlinkKafkaProducer fails while checkpointing, it throws a > >>>> KafkaException > >>>>> which gets wrapped in a CheckpointException which is sent to the JM > as > >> a > >>>>> DeclineCheckpoint. KafkaException isn't on the JM default classpath, > so > >>>> the > >>>>> JM throws a fairly cryptic ClassNotFoundException. The details of the > >>>>> KafkaException wind up suppressed so it's impossible to figure out > what > >>>>> actually went wrong. > >>>>> > >>>>> I can think of two fixes that would prevent this from occurring in > the > >>>>> Kafka or other connectors in the future: > >>>>> 1. DeclineCheckpoint should always send a SerializedThrowable to the > JM > >>>>> rather than allowing CheckpointExceptions with non-deserializable > root > >>>>> causes to slip through > >>>>> 2. CheckpointException should always capture its wrapped exception > as a > >>>>> SerializedThrowable (i.e., use 'super(new > SerializedThrowable(cause))' > >>>>> rather than 'super(cause)'). > >>>>> > >>>>> Thoughts? > >>>> > >>>> > >> > >> > >