Thanks for suggestion, Terry. I've investigated a bit further. DeclineCheckpoint specifically checks for the possibility of an exception that the JM won't be able to deserialize (i.e. anything other than a Checkpoint exception). It just doesn't check for the possibility of a CheckpointException that can't be deserialize because its root cause can't be deserialize.
I think the job succeeding on 1.8 and failing on 1.9 was a red herring -- 1.9 broke the FlinkKafkaProducer API so I wound up having to set the Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused checkpoints to fail sometimes. That caused the KafkaException to be propagated to the JM as the root cause of a CheckpointException. On Sun, Sep 22, 2019 at 5:03 AM Terry Wang <zjuwa...@gmail.com> wrote: > Hi, Jeffrey~ > > I think two fixes you mentioned may not work in your case. > This problem https://issues.apache.org/jira/browse/FLINK-14076 < > https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and JM > jar package environment inconsistent or jar loaded behavior inconsistent in > nature. > Maybe the behavior of standalone cluster’s dynamic class loader changed > in flink 1.9 since you mentioned that your program run normally in flink > 1.8. > Just a thought from me. > Hope to be useful~ > > Best, > Terry Wang > > > > > 在 2019年9月21日,上午2:58,Jeffrey Martin <jeffrey.martin...@gmail.com> 写道: > > > > JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076 > > > > I'm on Flink v1.9 with the Kafka connector and a standalone JM. > > > > If FlinkKafkaProducer fails while checkpointing, it throws a > KafkaException > > which gets wrapped in a CheckpointException which is sent to the JM as a > > DeclineCheckpoint. KafkaException isn't on the JM default classpath, so > the > > JM throws a fairly cryptic ClassNotFoundException. The details of the > > KafkaException wind up suppressed so it's impossible to figure out what > > actually went wrong. > > > > I can think of two fixes that would prevent this from occurring in the > > Kafka or other connectors in the future: > > 1. DeclineCheckpoint should always send a SerializedThrowable to the JM > > rather than allowing CheckpointExceptions with non-deserializable root > > causes to slip through > > 2. CheckpointException should always capture its wrapped exception as a > > SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))' > > rather than 'super(cause)'). > > > > Thoughts? > >