XComp commented on code in PR #20590:
URL: https://github.com/apache/flink/pull/20590#discussion_r949006541
##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java:
##########
@@ -213,14 +214,26 @@ public RetrievableStateHandle<T> addAndLock(String key, T
state)
// initialize flag to serve the failure case
boolean discardState = true;
+ final AtomicInteger retryNum = new AtomicInteger(0);
try {
// a successful operation will result in the state not being
discarded
discardState =
!updateConfigMap(
cm -> {
+ retryNum.incrementAndGet();
try {
return addEntry(cm, key,
serializedStoreHandle);
} catch (Exception e) {
+ // It could happen the fabric8 k8s
client retries a
+ // transaction that has already
succeeded due to network
+ // issues. We let the
AlreadyExistException caused by
+ //
PossibleInconsistentStateException here to avoid
+ // discarding the state.
+ if (retryNum.get() > 1
+ && e instanceof
AlreadyExistException) {
+ e.initCause(
Review Comment:
I'm wondering whether we actually want to expose the
`PossiblyInconsistentStateException` here. This would trigger a warning on
[CheckpointCoordinator:1413](https://github.com/apache/flink/blob/88b309b7dcad269ad084eab5e2944724daf6dee4/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1413).
I'd assume that this warning is not needed in this specific case.
Additionally, it will trigger the checkpoint creation for the current run to
fail even though the `addEntry` method worked as expected. No error needs to be
exposed and the related Checkpoint will be handled as normal.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]