Hi Jeffrey, You are right and I understood what you have said after I just studied the class org.apache.flink.util.SerializedThrowable. I prefer the fixes no.2 you mentioned: CheckpointException should always capture its wrapped exception as a SerializedThrowable Looking forward to seeing your pr soon :)
Best, Terry Wang > 在 2019年9月23日,上午11:48,Jeffrey Martin <jeffrey.martin...@gmail.com> 写道: > > Hi Terry, > > KafkaException comes in through the job's dependencies (it's defined in the > kafka-clients jar packed up in the fat job jar) and is on either the TM nor > JM default classpath. The job running in the TM includes the job > dependencies and so can throw a KafkaException but the JM can't deserialize > it because it's not available on the default classpath. > > I'm suggesting defensively wrapping all causes of a CheckpointException in > a SerializedThrowable (in addition to defensively wrapping everything > except a CheckpointException). I believe SerializedThrowable is there > specifically for this case, i.e. where a job in the TM sends the JM an > exception that's defined only in the job itself. > > It might be clearer if I just put up a PR :) I'd be happy to and it'll be > very short. > > Best, > > Jeff > > On Sun, Sep 22, 2019 at 7:45 PM Terry Wang <zjuwa...@gmail.com> wrote: > >> Hi, Jeffrey~ >> >> Thanks for your detailed explanation and I understood why job failed with >> flink 1.9. >> >> But the two fixes you mentioned may still not work well. As KafkaException >> can be serialized >> in TM for there is necessary jar in its classpath but not in JM, so maybe >> it’s impossible to check >> the possibility of serialization in advance. >> Do I understand right? >> >> >> >> Best, >> Terry Wang >> >> >> >>> 在 2019年9月23日,上午5:17,Jeffrey Martin <jeffrey.martin...@gmail.com> 写道: >>> >>> Thanks for suggestion, Terry. I've investigated a bit further. >>> >>> DeclineCheckpoint specifically checks for the possibility of an exception >>> that the JM won't be able to deserialize (i.e. anything other than a >>> Checkpoint exception). It just doesn't check for the possibility of a >>> CheckpointException that can't be deserialize because its root cause >> can't >>> be deserialize. >>> >>> I think the job succeeding on 1.8 and failing on 1.9 was a red herring -- >>> 1.9 broke the FlinkKafkaProducer API so I wound up having to set the >>> Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused >>> checkpoints to fail sometimes. That caused the KafkaException to be >>> propagated to the JM as the root cause of a CheckpointException. >>> >>> On Sun, Sep 22, 2019 at 5:03 AM Terry Wang <zjuwa...@gmail.com> wrote: >>> >>>> Hi, Jeffrey~ >>>> >>>> I think two fixes you mentioned may not work in your case. >>>> This problem https://issues.apache.org/jira/browse/FLINK-14076 < >>>> https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and >> JM >>>> jar package environment inconsistent or jar loaded behavior >> inconsistent in >>>> nature. >>>> Maybe the behavior of standalone cluster’s dynamic class loader changed >>>> in flink 1.9 since you mentioned that your program run normally in flink >>>> 1.8. >>>> Just a thought from me. >>>> Hope to be useful~ >>>> >>>> Best, >>>> Terry Wang >>>> >>>> >>>> >>>>> 在 2019年9月21日,上午2:58,Jeffrey Martin <jeffrey.martin...@gmail.com> 写道: >>>>> >>>>> JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076 >>>>> >>>>> I'm on Flink v1.9 with the Kafka connector and a standalone JM. >>>>> >>>>> If FlinkKafkaProducer fails while checkpointing, it throws a >>>> KafkaException >>>>> which gets wrapped in a CheckpointException which is sent to the JM as >> a >>>>> DeclineCheckpoint. KafkaException isn't on the JM default classpath, so >>>> the >>>>> JM throws a fairly cryptic ClassNotFoundException. The details of the >>>>> KafkaException wind up suppressed so it's impossible to figure out what >>>>> actually went wrong. >>>>> >>>>> I can think of two fixes that would prevent this from occurring in the >>>>> Kafka or other connectors in the future: >>>>> 1. DeclineCheckpoint should always send a SerializedThrowable to the JM >>>>> rather than allowing CheckpointExceptions with non-deserializable root >>>>> causes to slip through >>>>> 2. CheckpointException should always capture its wrapped exception as a >>>>> SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))' >>>>> rather than 'super(cause)'). >>>>> >>>>> Thoughts? >>>> >>>> >> >>