tillrohrmann commented on a change in pull request #15832: URL: https://github.com/apache/flink/pull/15832#discussion_r628363445
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/persistence/PossibleInconsistentStateException.java ########## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.persistence; + +/** + * {@code PossibleInconsistentStateException} represents errors that might have lead to an + * inconsistent state within the HA resources. + */ +public class PossibleInconsistentStateException extends Exception { Review comment: Shall we let this exception extend from `FlinkException`? ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java ########## @@ -114,21 +118,28 @@ public KubernetesStateHandleStore( * @param key Key in ConfigMap * @param state State to be added * @throws AlreadyExistException if the name already exists + * @throws PossibleInconsistentStateException if the write-to-Kubernetes operation failed. This + * indicates that it's not clear whether the new state was successfully written to + * Kubernetes or not. No state was discarded. Proper error handling has to be applied on the + * caller's side. * @throws Exception if persisting state or writing state handle failed */ @Override - public RetrievableStateHandle<T> addAndLock(String key, T state) throws Exception { + public RetrievableStateHandle<T> addAndLock(String key, T state) + throws PossibleInconsistentStateException, Exception { checkNotNull(key, "Key in ConfigMap."); checkNotNull(state, "State."); final RetrievableStateHandle<T> storeHandle = storage.store(state); - boolean success = false; + final byte[] serializedStoreHandle = serializeStateHandle(storeHandle); + // initialize flag to serve the failure case + boolean discardState = true; try { - final byte[] serializedStoreHandle = InstantiationUtil.serializeObject(storeHandle); - success = - kubeClient + // a successful operation will result in the state not being discarded + discardState = + !kubeClient Review comment: I think this is personal taste: I find it easier to think about it with `success` and w/o the negation here. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java ########## @@ -470,6 +508,26 @@ public String toString() { return this.getClass().getSimpleName() + "{configMapName='" + configMapName + "'}"; } + private byte[] serializeStateHandle(RetrievableStateHandle<T> retrievableStateHandle) + throws Exception { + try { + // Serialize the state handle. This writes the state to the backend. + return InstantiationUtil.serializeObject(retrievableStateHandle); + } catch (Exception e) { + discardState(retrievableStateHandle); Review comment: I think here it would be good to catch a possible exception and add it as a suppressed exception to `e`. Otherwise, we might lose it. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ########## @@ -196,27 +231,74 @@ public void replace(String pathInZooKeeper, IntegerResourceVersion expectedVersi RetrievableStateHandle<T> newStateHandle = storage.store(state); - boolean success = false; + final byte[] serializedStateHandle = serializeStateHandle(newStateHandle); + // initialize flags to serve the failure case + boolean discardOldState = false; + boolean discardNewState = true; try { - // Serialize the new state handle. This writes the state to the backend. - byte[] serializedStateHandle = InstantiationUtil.serializeObject(newStateHandle); + setStateHandle(path, serializedStateHandle, expectedVersion.getValue()); + + // swap subject for deletion in case of success + discardOldState = true; + discardNewState = false; + } catch (Exception e) { + if (indicatesPossiblyInconsistentState(e)) { + // it's unclear whether the state handle metadata was written to ZooKeeper - + // hence, we don't discard any data + discardOldState = false; + discardNewState = false; + throw new PossibleInconsistentStateException(e); + } - // Replace state handle in ZooKeeper. - client.setData() - .withVersion(expectedVersion.getValue()) - .forPath(path, serializedStateHandle); - success = true; - } catch (KeeperException.NoNodeException e) { // We wrap the exception here so that it could be caught in DefaultJobGraphStore - throw new NotExistException("ZooKeeper node " + path + " does not exist.", e); + throw ExceptionUtils.findThrowable(e, KeeperException.NoNodeException.class) + .map( + nnee -> + new NotExistException( + "ZooKeeper node " + path + " does not exist.", nnee)) + .orElseThrow(() -> e); } finally { - if (success) { - oldStateHandle.discardState(); - } else { - newStateHandle.discardState(); + if (discardOldState) { + discardState(oldStateHandle); } + + if (discardNewState) { + discardState(newStateHandle); + } + } + } + + // this method is provided for the sole purpose of easier testing + @VisibleForTesting + protected void setStateHandle(String path, byte[] serializedStateHandle, int expectedVersion) + throws Exception { + // Replace state handle in ZooKeeper. + client.setData().withVersion(expectedVersion).forPath(path, serializedStateHandle); + } + + private static void discardState(@Nullable StateObject stateObject) throws Exception { Review comment: `@Nullable` required? ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java ########## @@ -470,6 +508,26 @@ public String toString() { return this.getClass().getSimpleName() + "{configMapName='" + configMapName + "'}"; } + private byte[] serializeStateHandle(RetrievableStateHandle<T> retrievableStateHandle) + throws Exception { + try { + // Serialize the state handle. This writes the state to the backend. + return InstantiationUtil.serializeObject(retrievableStateHandle); + } catch (Exception e) { + discardState(retrievableStateHandle); + ExceptionUtils.rethrow(e); + } + + // will never happen but is added to please the compiler + return new byte[0]; + } + + private static void discardState(@Nullable StateObject stateObject) throws Exception { Review comment: Is `@Nullable` required? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
