tillrohrmann commented on a change in pull request #17607: URL: https://github.com/apache/flink/pull/17607#discussion_r743521497
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ########## @@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore( throws PossibleInconsistentStateException, Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - final String path = normalizePath(pathInZooKeeper); + try { + try { + // Obtain the write-lock. + client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path)); + } catch (KeeperException.NodeExistsException e) { + // There should always be a single JobMaster for each job, so we should never + // encounter this. + throw new IllegalStateException( + "Write lock is already taken, which signals there are multiple processes writing to the same path. This should never happen.", + e); + } + if (exists(path).isExisting()) { + throw new AlreadyExistException( + String.format("ZooKeeper node %s already exists.", path)); + } + return doAddAndLock(path, state); + } finally { + // Release the write-lock. + deleteIfExists(getWriteLockPath(path)); + } + } - RetrievableStateHandle<T> storeHandle = storage.store(state); - - byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); - + private RetrievableStateHandle<T> doAddAndLock(String path, T state) throws Exception { + final RetrievableStateHandle<T> storeHandle = storage.store(state); + final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); try { writeStoreHandleTransactionally(path, serializedStoreHandle); return storeHandle; } catch (Exception e) { if (indicatesPossiblyInconsistentState(e)) { throw new PossibleInconsistentStateException(e); } - - // in any other failure case: discard the state + // In case of any other failure, discard the state and rethrow the exception. storeHandle.discardState(); - - // We wrap the exception here so that it could be caught in DefaultJobGraphStore - throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class) - .map( - nee -> - new AlreadyExistException( - "ZooKeeper node " + path + " already exists.", nee)) - .orElseThrow(() -> e); + throw e; } } // this method is provided for the sole purpose of easier testing @VisibleForTesting protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle) throws Exception { - // Write state handle (not the actual state) to ZooKeeper. This is expected to be - // smaller than the state itself. This level of indirection makes sure that data in - // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but - // the state can be larger. - // Create the lock node in a transaction with the actual state node. That way we can + // Write state handle (not the actual state) to ZooKeeper. This is expected to be smaller + // than the state itself. This level of indirection makes sure that data in ZooKeeper is + // small, because ZooKeeper is designed for data in the KB range, but the state can be + // larger. Create the lock node in a transaction with the actual state node. That way we can // prevent race conditions with a concurrent delete operation. - client.inTransaction() - .create() - .withMode(CreateMode.PERSISTENT) - .forPath(path, serializedStoreHandle) - .and() - .create() - .withMode(CreateMode.EPHEMERAL) - .forPath(getLockPath(path)) - .and() - .commit(); + while (true) { + try { + client.inTransaction() + .create() + .withMode(CreateMode.PERSISTENT) + .forPath(path, serializedStoreHandle) + .and() + .create() + .withMode(CreateMode.EPHEMERAL) + .forPath(getLockPath(path)) + .and() + .commit(); + break; + } catch (KeeperException.NodeExistsException e) { + // Transactions are not idempotent in the curator version we're currently using, so + // it is actually possible that we've re-tried a transaction that has already + // succeeded. We've ensured that the node hasn't been present prior executing the + // transaction, so we're pretty confident that this is a result of the retry + // mechanism. Review comment: If we catch a `NodeExistsException`, then we will retry the loop. Won't this cause another `NodeExistsException`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ########## @@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore( throws PossibleInconsistentStateException, Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - final String path = normalizePath(pathInZooKeeper); + try { + try { + // Obtain the write-lock. + client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path)); + } catch (KeeperException.NodeExistsException e) { + // There should always be a single JobMaster for each job, so we should never + // encounter this. + throw new IllegalStateException( + "Write lock is already taken, which signals there are multiple processes writing to the same path. This should never happen.", + e); + } + if (exists(path).isExisting()) { + throw new AlreadyExistException( + String.format("ZooKeeper node %s already exists.", path)); + } + return doAddAndLock(path, state); + } finally { + // Release the write-lock. + deleteIfExists(getWriteLockPath(path)); + } + } - RetrievableStateHandle<T> storeHandle = storage.store(state); - - byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); - + private RetrievableStateHandle<T> doAddAndLock(String path, T state) throws Exception { + final RetrievableStateHandle<T> storeHandle = storage.store(state); + final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); try { writeStoreHandleTransactionally(path, serializedStoreHandle); return storeHandle; } catch (Exception e) { if (indicatesPossiblyInconsistentState(e)) { throw new PossibleInconsistentStateException(e); } - - // in any other failure case: discard the state + // In case of any other failure, discard the state and rethrow the exception. storeHandle.discardState(); - - // We wrap the exception here so that it could be caught in DefaultJobGraphStore - throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class) - .map( - nee -> - new AlreadyExistException( - "ZooKeeper node " + path + " already exists.", nee)) - .orElseThrow(() -> e); + throw e; } } // this method is provided for the sole purpose of easier testing @VisibleForTesting protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle) throws Exception { - // Write state handle (not the actual state) to ZooKeeper. This is expected to be - // smaller than the state itself. This level of indirection makes sure that data in - // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but - // the state can be larger. - // Create the lock node in a transaction with the actual state node. That way we can + // Write state handle (not the actual state) to ZooKeeper. This is expected to be smaller + // than the state itself. This level of indirection makes sure that data in ZooKeeper is + // small, because ZooKeeper is designed for data in the KB range, but the state can be + // larger. Create the lock node in a transaction with the actual state node. That way we can // prevent race conditions with a concurrent delete operation. - client.inTransaction() - .create() - .withMode(CreateMode.PERSISTENT) - .forPath(path, serializedStoreHandle) - .and() - .create() - .withMode(CreateMode.EPHEMERAL) - .forPath(getLockPath(path)) - .and() - .commit(); + while (true) { + try { + client.inTransaction() + .create() + .withMode(CreateMode.PERSISTENT) + .forPath(path, serializedStoreHandle) + .and() + .create() + .withMode(CreateMode.EPHEMERAL) + .forPath(getLockPath(path)) + .and() + .commit(); + break; + } catch (KeeperException.NodeExistsException e) { Review comment: We could also read the contents of `path` to see whether we have written the `serializedStoreHandle`. If this is the case, then the call succeeded and otherwise we can throw the `NodeExistsException`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ########## @@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore( throws PossibleInconsistentStateException, Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - final String path = normalizePath(pathInZooKeeper); + try { + try { + // Obtain the write-lock. + client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path)); Review comment: Won't this call have the same problem we try to solve with this PR? Differently asked, won't we have a problem if this call retries and the first call already succeeded? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ########## @@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore( throws PossibleInconsistentStateException, Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - final String path = normalizePath(pathInZooKeeper); + try { + try { + // Obtain the write-lock. + client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path)); + } catch (KeeperException.NodeExistsException e) { + // There should always be a single JobMaster for each job, so we should never + // encounter this. + throw new IllegalStateException( + "Write lock is already taken, which signals there are multiple processes writing to the same path. This should never happen.", + e); + } + if (exists(path).isExisting()) { + throw new AlreadyExistException( + String.format("ZooKeeper node %s already exists.", path)); + } + return doAddAndLock(path, state); + } finally { + // Release the write-lock. + deleteIfExists(getWriteLockPath(path)); + } + } - RetrievableStateHandle<T> storeHandle = storage.store(state); - - byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); - + private RetrievableStateHandle<T> doAddAndLock(String path, T state) throws Exception { + final RetrievableStateHandle<T> storeHandle = storage.store(state); + final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); try { writeStoreHandleTransactionally(path, serializedStoreHandle); return storeHandle; } catch (Exception e) { if (indicatesPossiblyInconsistentState(e)) { throw new PossibleInconsistentStateException(e); } - - // in any other failure case: discard the state + // In case of any other failure, discard the state and rethrow the exception. storeHandle.discardState(); - - // We wrap the exception here so that it could be caught in DefaultJobGraphStore - throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class) - .map( - nee -> - new AlreadyExistException( - "ZooKeeper node " + path + " already exists.", nee)) - .orElseThrow(() -> e); + throw e; } } // this method is provided for the sole purpose of easier testing @VisibleForTesting protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle) throws Exception { - // Write state handle (not the actual state) to ZooKeeper. This is expected to be - // smaller than the state itself. This level of indirection makes sure that data in - // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but - // the state can be larger. - // Create the lock node in a transaction with the actual state node. That way we can + // Write state handle (not the actual state) to ZooKeeper. This is expected to be smaller + // than the state itself. This level of indirection makes sure that data in ZooKeeper is + // small, because ZooKeeper is designed for data in the KB range, but the state can be + // larger. Create the lock node in a transaction with the actual state node. That way we can // prevent race conditions with a concurrent delete operation. - client.inTransaction() - .create() - .withMode(CreateMode.PERSISTENT) - .forPath(path, serializedStoreHandle) - .and() - .create() - .withMode(CreateMode.EPHEMERAL) - .forPath(getLockPath(path)) - .and() - .commit(); + while (true) { + try { + client.inTransaction() + .create() + .withMode(CreateMode.PERSISTENT) + .forPath(path, serializedStoreHandle) + .and() + .create() + .withMode(CreateMode.EPHEMERAL) + .forPath(getLockPath(path)) + .and() + .commit(); + break; + } catch (KeeperException.NodeExistsException e) { + // Transactions are not idempotent in the curator version we're currently using, so + // it is actually possible that we've re-tried a transaction that has already + // succeeded. We've ensured that the node hasn't been present prior executing the + // transaction, so we're pretty confident that this is a result of the retry + // mechanism. Review comment: Maybe adding a test for this method could be helpful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org