XComp commented on a change in pull request #15832:
URL: https://github.com/apache/flink/pull/15832#discussion_r630799380
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -126,56 +144,71 @@ public ZooKeeperStateHandleStore(
* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not*
exist yet)
* @param state State to be added
* @return The Created {@link RetrievableStateHandle}.
+ * @throws PossibleInconsistentStateException if the write-to-ZooKeeper
operation failed. This
+ * indicates that it's not clear whether the new state was
successfully written to ZooKeeper
+ * or not. Proper error handling has to be applied on the caller's
side.
* @throws Exception If a ZooKeeper or state handle operation fails
*/
@Override
- public RetrievableStateHandle<T> addAndLock(String pathInZooKeeper, T
state) throws Exception {
+ public RetrievableStateHandle<T> addAndLock(String pathInZooKeeper, T
state)
+ throws PossibleInconsistentStateException, Exception {
checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
checkNotNull(state, "State");
final String path = normalizePath(pathInZooKeeper);
RetrievableStateHandle<T> storeHandle = storage.store(state);
- boolean success = false;
+ byte[] serializedStoreHandle = serializeStateHandle(storeHandle);
+ // initialize flag to serve the failure case
+ boolean discardState = true;
try {
- // Serialize the state handle. This writes the state to the
backend.
- byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(storeHandle);
-
- // Write state handle (not the actual state) to ZooKeeper. This is
expected to be
- // smaller than the state itself. This level of indirection makes
sure that data in
- // ZooKeeper is small, because ZooKeeper is designed for data in
the KB range, but
- // the state can be larger.
- // Create the lock node in a transaction with the actual state
node. That way we can
- // prevent
- // race conditions with a concurrent delete operation.
- client.inTransaction()
- .create()
- .withMode(CreateMode.PERSISTENT)
- .forPath(path, serializedStoreHandle)
- .and()
- .create()
- .withMode(CreateMode.EPHEMERAL)
- .forPath(getLockPath(path))
- .and()
- .commit();
-
- success = true;
+ writeStoreHandleTransactionally(path, serializedStoreHandle);
+ discardState = false;
return storeHandle;
- } catch (KeeperException.NodeExistsException e) {
+ } catch (Exception e) {
+ if (indicatesPossiblyInconsistentState(e)) {
+ discardState = false;
+ throw new PossibleInconsistentStateException(e);
+ }
+
// We wrap the exception here so that it could be caught in
DefaultJobGraphStore
- throw new AlreadyExistException("ZooKeeper node " + path + "
already exists.", e);
+ throw ExceptionUtils.findThrowable(e,
KeeperException.NodeExistsException.class)
+ .map(
+ nee ->
+ new AlreadyExistException(
+ "ZooKeeper node " + path + "
already exists.", nee))
+ .orElseThrow(() -> e);
} finally {
- if (!success) {
- // Cleanup the state handle if it was not written to ZooKeeper.
- if (storeHandle != null) {
- storeHandle.discardState();
- }
+ if (discardState) {
+ storeHandle.discardState();
Review comment:
That's a good catch. Thanks for the idea. I will apply it. 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]