XComp commented on a change in pull request #15832:
URL: https://github.com/apache/flink/pull/15832#discussion_r630810712
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -196,29 +229,55 @@ public void replace(String pathInZooKeeper,
IntegerResourceVersion expectedVersi
RetrievableStateHandle<T> newStateHandle = storage.store(state);
- boolean success = false;
+ final byte[] serializedStateHandle =
serializeStateHandle(newStateHandle);
+ // initialize flags to serve the failure case
+ boolean discardOldState = false;
+ boolean discardNewState = true;
try {
- // Serialize the new state handle. This writes the state to the
backend.
- byte[] serializedStateHandle =
InstantiationUtil.serializeObject(newStateHandle);
+ setStateHandle(path, serializedStateHandle,
expectedVersion.getValue());
+
+ // swap subject for deletion in case of success
+ discardOldState = true;
+ discardNewState = false;
+ } catch (Exception e) {
+ if (indicatesPossiblyInconsistentState(e)) {
+ // it's unclear whether the state handle metadata was written
to ZooKeeper -
+ // hence, we don't discard any data
+ discardNewState = false;
+ throw new PossibleInconsistentStateException(e);
+ }
- // Replace state handle in ZooKeeper.
- client.setData()
- .withVersion(expectedVersion.getValue())
- .forPath(path, serializedStateHandle);
- success = true;
- } catch (KeeperException.NoNodeException e) {
// We wrap the exception here so that it could be caught in
DefaultJobGraphStore
- throw new NotExistException("ZooKeeper node " + path + " does not
exist.", e);
+ throw ExceptionUtils.findThrowable(e,
KeeperException.NoNodeException.class)
+ .map(
+ nnee ->
+ new NotExistException(
+ "ZooKeeper node " + path + " does
not exist.", nnee))
+ .orElseThrow(() -> e);
} finally {
- if (success) {
+ if (discardOldState) {
oldStateHandle.discardState();
- } else {
+ }
+
+ if (discardNewState) {
newStateHandle.discardState();
}
}
}
+ // this method is provided for the sole purpose of easier testing
+ @VisibleForTesting
+ protected void setStateHandle(String path, byte[] serializedStateHandle,
int expectedVersion)
+ throws Exception {
+ // Replace state handle in ZooKeeper.
+ client.setData().withVersion(expectedVersion).forPath(path,
serializedStateHandle);
+ }
+
+ private boolean indicatesPossiblyInconsistentState(Exception e) {
+ return !PRE_COMMIT_EXCEPTIONS.contains(e.getClass());
Review comment:
I would like to keep it more explicit. If subclasses are meant to be in
that set, we should add them explicitly. Your example with
`AuthExpiredException` is just a made up example, isn't it? I couldn't find it
in the code. I was going through the error cases that are listed in
[KeeperException.create](https://github.com/apache/zookeeper/blob/branch-3.4/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java#L93)
to cover the possible cases that are triggered by the `commit()` call.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]