dmvk commented on a change in pull request #18901:
URL: https://github.com/apache/flink/pull/18901#discussion_r814721306
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -379,39 +456,51 @@ public StringResourceVersion exists(String key) throws
Exception {
public boolean releaseAndTryRemove(String key) throws Exception {
checkNotNull(key, "Key in ConfigMap.");
final AtomicReference<RetrievableStateHandle<T>> stateHandleRefer =
new AtomicReference<>();
-
- return kubeClient
- .checkAndUpdateConfigMap(
- configMapName,
+ return updateConfigMap(
configMap -> {
- if (isValidOperation(configMap)) {
- final String content =
configMap.getData().remove(key);
- if (content != null) {
- try {
-
stateHandleRefer.set(deserializeObject(content));
- } catch (IOException e) {
- LOG.warn(
- "Could not retrieve the state
handle of {} from ConfigMap {}.",
- key,
- configMapName,
- e);
+ final String content =
configMap.getData().get(key);
+ if (content != null) {
+ try {
+ final StateHandleWithDeleteMarker<T>
result =
+ deserializeStateHandle(content);
+ if (!result.isMarkedForDeletion()) {
+ configMap
+ .getData()
+ .put(
+ key,
+ encodeStateHandle(
+
InstantiationUtil.serializeObject(
+
result.toDeleting())));
}
+ stateHandleRefer.set(result.getInner());
+ } catch (IOException e) {
+ LOG.warn(
+ "Could not retrieve the state
handle of {} from ConfigMap {}.",
+ key,
+ configMapName,
+ e);
+ // TODO log / comment
+
Objects.requireNonNull(configMap.getData().remove(key));
}
- return Optional.of(configMap);
}
- return Optional.empty();
+ return Optional.of(configMap);
})
- .whenComplete(
- (succeed, ignore) -> {
- if (succeed) {
- if (stateHandleRefer.get() != null) {
- try {
- stateHandleRefer.get().discardState();
- } catch (Exception e) {
- throw new CompletionException(e);
- }
+ .thenCompose(
+ updated -> {
+ // We don't care whether the configmap has been
updated or not
Review comment:
I think that's not enough, we should also provide an explanation why we
can ignore this, because it's not simple to reason about (will add this)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]