rkhachatryan commented on a change in pull request #15832:
URL: https://github.com/apache/flink/pull/15832#discussion_r630164743
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -126,56 +144,71 @@ public ZooKeeperStateHandleStore(
* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not*
exist yet)
* @param state State to be added
* @return The Created {@link RetrievableStateHandle}.
+ * @throws PossibleInconsistentStateException if the write-to-ZooKeeper
operation failed. This
+ * indicates that it's not clear whether the new state was
successfully written to ZooKeeper
+ * or not. Proper error handling has to be applied on the caller's
side.
* @throws Exception If a ZooKeeper or state handle operation fails
*/
@Override
- public RetrievableStateHandle<T> addAndLock(String pathInZooKeeper, T
state) throws Exception {
+ public RetrievableStateHandle<T> addAndLock(String pathInZooKeeper, T
state)
+ throws PossibleInconsistentStateException, Exception {
checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
checkNotNull(state, "State");
final String path = normalizePath(pathInZooKeeper);
RetrievableStateHandle<T> storeHandle = storage.store(state);
- boolean success = false;
+ byte[] serializedStoreHandle = serializeStateHandle(storeHandle);
+ // initialize flag to serve the failure case
+ boolean discardState = true;
try {
- // Serialize the state handle. This writes the state to the
backend.
- byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(storeHandle);
-
- // Write state handle (not the actual state) to ZooKeeper. This is
expected to be
- // smaller than the state itself. This level of indirection makes
sure that data in
- // ZooKeeper is small, because ZooKeeper is designed for data in
the KB range, but
- // the state can be larger.
- // Create the lock node in a transaction with the actual state
node. That way we can
- // prevent
- // race conditions with a concurrent delete operation.
- client.inTransaction()
- .create()
- .withMode(CreateMode.PERSISTENT)
- .forPath(path, serializedStoreHandle)
- .and()
- .create()
- .withMode(CreateMode.EPHEMERAL)
- .forPath(getLockPath(path))
- .and()
- .commit();
-
- success = true;
+ writeStoreHandleTransactionally(path, serializedStoreHandle);
+ discardState = false;
return storeHandle;
- } catch (KeeperException.NodeExistsException e) {
+ } catch (Exception e) {
+ if (indicatesPossiblyInconsistentState(e)) {
+ discardState = false;
+ throw new PossibleInconsistentStateException(e);
+ }
+
// We wrap the exception here so that it could be caught in
DefaultJobGraphStore
- throw new AlreadyExistException("ZooKeeper node " + path + "
already exists.", e);
+ throw ExceptionUtils.findThrowable(e,
KeeperException.NodeExistsException.class)
Review comment:
Is this line reachable? In `indicatesPossiblyInconsistentState` above,
we already checked for `NodeExistsException` and thrown an exception if it was
found.
I think it's better to place this check before the
`indicatesPossiblyInconsistentState` call (rather than updating the map).
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -126,56 +144,71 @@ public ZooKeeperStateHandleStore(
* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not*
exist yet)
* @param state State to be added
* @return The Created {@link RetrievableStateHandle}.
+ * @throws PossibleInconsistentStateException if the write-to-ZooKeeper
operation failed. This
+ * indicates that it's not clear whether the new state was
successfully written to ZooKeeper
+ * or not. Proper error handling has to be applied on the caller's
side.
* @throws Exception If a ZooKeeper or state handle operation fails
*/
@Override
- public RetrievableStateHandle<T> addAndLock(String pathInZooKeeper, T
state) throws Exception {
+ public RetrievableStateHandle<T> addAndLock(String pathInZooKeeper, T
state)
+ throws PossibleInconsistentStateException, Exception {
checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
checkNotNull(state, "State");
final String path = normalizePath(pathInZooKeeper);
RetrievableStateHandle<T> storeHandle = storage.store(state);
- boolean success = false;
+ byte[] serializedStoreHandle = serializeStateHandle(storeHandle);
+ // initialize flag to serve the failure case
+ boolean discardState = true;
try {
- // Serialize the state handle. This writes the state to the
backend.
- byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(storeHandle);
-
- // Write state handle (not the actual state) to ZooKeeper. This is
expected to be
- // smaller than the state itself. This level of indirection makes
sure that data in
- // ZooKeeper is small, because ZooKeeper is designed for data in
the KB range, but
- // the state can be larger.
- // Create the lock node in a transaction with the actual state
node. That way we can
- // prevent
- // race conditions with a concurrent delete operation.
- client.inTransaction()
- .create()
- .withMode(CreateMode.PERSISTENT)
- .forPath(path, serializedStoreHandle)
- .and()
- .create()
- .withMode(CreateMode.EPHEMERAL)
- .forPath(getLockPath(path))
- .and()
- .commit();
-
- success = true;
+ writeStoreHandleTransactionally(path, serializedStoreHandle);
+ discardState = false;
return storeHandle;
- } catch (KeeperException.NodeExistsException e) {
+ } catch (Exception e) {
+ if (indicatesPossiblyInconsistentState(e)) {
+ discardState = false;
+ throw new PossibleInconsistentStateException(e);
+ }
+
// We wrap the exception here so that it could be caught in
DefaultJobGraphStore
- throw new AlreadyExistException("ZooKeeper node " + path + "
already exists.", e);
+ throw ExceptionUtils.findThrowable(e,
KeeperException.NodeExistsException.class)
+ .map(
+ nee ->
+ new AlreadyExistException(
+ "ZooKeeper node " + path + "
already exists.", nee))
+ .orElseThrow(() -> e);
} finally {
- if (!success) {
- // Cleanup the state handle if it was not written to ZooKeeper.
- if (storeHandle != null) {
- storeHandle.discardState();
- }
+ if (discardState) {
+ storeHandle.discardState();
Review comment:
I think we could get rid of a boolean flag and simplify code a bit by
discarding the handle in the `catch` block instead of `finally`, something like
this:
```
try {
writeStoreHandleTransactionally();
} catch (Exception e) {
if (indicatesPossiblyInconsistentState) {
throw PossibleInconsistentStateException;
} else {
storeHandle.discardState();
}
}
```
WDYT?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/util/StateHandleStoreUtils.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.runtime.persistence.StateHandleStore;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+/**
+ * {@code StateHandleStoreUtils} collects utility methods that might be
usefule for any {@link
+ * StateHandleStore} implementation.
+ */
+public class StateHandleStoreUtils {
+
+ /**
+ * Serializes the passed {@link StateObject} and discards the state in
case of failure.
+ *
+ * @param stateObject the {@code StateObject} that shall be serialized.
+ * @return The serialized version of the passed {@code StateObject}.
+ * @throws Exception if an error occurred during the serialization. The
corresponding {@code
+ * StateObject} will be discarded in that case.
+ */
+ public static byte[] serializeStateHandle(StateObject stateObject) throws
Exception {
Review comment:
I think it would be more consistent to have a symmetric method in this
class to deserialize handle.
`ZooKeeperStateHandleStore` currently calls
`InstantiationUtil.deserializeObject` for that.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/util/StateHandleStoreUtils.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.runtime.persistence.StateHandleStore;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+/**
+ * {@code StateHandleStoreUtils} collects utility methods that might be
usefule for any {@link
+ * StateHandleStore} implementation.
+ */
+public class StateHandleStoreUtils {
+
+ /**
+ * Serializes the passed {@link StateObject} and discards the state in
case of failure.
+ *
+ * @param stateObject the {@code StateObject} that shall be serialized.
+ * @return The serialized version of the passed {@code StateObject}.
+ * @throws Exception if an error occurred during the serialization. The
corresponding {@code
+ * StateObject} will be discarded in that case.
+ */
+ public static byte[] serializeStateHandle(StateObject stateObject) throws
Exception {
+ try {
+ return InstantiationUtil.serializeObject(stateObject);
+ } catch (Exception e) {
+ try {
+ stateObject.discardState();
Review comment:
**nit**: this contract isn't very obvious to me, maybe it makes sense to
rename the method to `serializeOrDiscard` or similar
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -196,29 +229,55 @@ public void replace(String pathInZooKeeper,
IntegerResourceVersion expectedVersi
RetrievableStateHandle<T> newStateHandle = storage.store(state);
- boolean success = false;
+ final byte[] serializedStateHandle =
serializeStateHandle(newStateHandle);
+ // initialize flags to serve the failure case
+ boolean discardOldState = false;
+ boolean discardNewState = true;
try {
- // Serialize the new state handle. This writes the state to the
backend.
- byte[] serializedStateHandle =
InstantiationUtil.serializeObject(newStateHandle);
+ setStateHandle(path, serializedStateHandle,
expectedVersion.getValue());
+
+ // swap subject for deletion in case of success
+ discardOldState = true;
+ discardNewState = false;
+ } catch (Exception e) {
+ if (indicatesPossiblyInconsistentState(e)) {
+ // it's unclear whether the state handle metadata was written
to ZooKeeper -
+ // hence, we don't discard any data
+ discardNewState = false;
+ throw new PossibleInconsistentStateException(e);
+ }
- // Replace state handle in ZooKeeper.
- client.setData()
- .withVersion(expectedVersion.getValue())
- .forPath(path, serializedStateHandle);
- success = true;
- } catch (KeeperException.NoNodeException e) {
// We wrap the exception here so that it could be caught in
DefaultJobGraphStore
- throw new NotExistException("ZooKeeper node " + path + " does not
exist.", e);
+ throw ExceptionUtils.findThrowable(e,
KeeperException.NoNodeException.class)
+ .map(
+ nnee ->
+ new NotExistException(
+ "ZooKeeper node " + path + " does
not exist.", nnee))
+ .orElseThrow(() -> e);
} finally {
- if (success) {
+ if (discardOldState) {
oldStateHandle.discardState();
- } else {
+ }
+
+ if (discardNewState) {
newStateHandle.discardState();
}
}
}
+ // this method is provided for the sole purpose of easier testing
+ @VisibleForTesting
+ protected void setStateHandle(String path, byte[] serializedStateHandle,
int expectedVersion)
+ throws Exception {
+ // Replace state handle in ZooKeeper.
+ client.setData().withVersion(expectedVersion).forPath(path,
serializedStateHandle);
+ }
+
+ private boolean indicatesPossiblyInconsistentState(Exception e) {
+ return !PRE_COMMIT_EXCEPTIONS.contains(e.getClass());
Review comment:
Checking for class means that no subclass will match. Maybe
`NoAuthException` extended by `AuthExpiredException`.
I'm not sure whether this is a real issue though.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
##########
@@ -160,8 +177,94 @@ public void testFailingCompletedCheckpointStoreAdd()
throws Exception {
.discardState();
}
+ @Test
+ public void testCleanupForGenericFailure() throws Exception {
+ testStoringFailureHandling(new FlinkRuntimeException("Expected
exception"), 1);
+ }
+
+ @Test
+ public void testCleanupOmissionForPossibleInconsistentStateException()
throws Exception {
+ testStoringFailureHandling(new PossibleInconsistentStateException(),
0);
+ }
+
+ private void testStoringFailureHandling(Exception failure, int
expectedCleanupCalls)
+ throws Exception {
+ final JobVertexID jobVertexID1 = new JobVertexID();
+
+ final ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(jobVertexID1)
+ .build();
+
+ final ExecutionVertex vertex =
graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
+ final ExecutionAttemptID attemptId =
vertex.getCurrentExecutionAttempt().getAttemptId();
+
+ final StandaloneCheckpointIDCounter checkpointIDCounter =
+ new StandaloneCheckpointIDCounter();
+
+ final ManuallyTriggeredScheduledExecutor
manuallyTriggeredScheduledExecutor =
+ new ManuallyTriggeredScheduledExecutor();
+
+ final CompletedCheckpointStore completedCheckpointStore =
+ new FailingCompletedCheckpointStore(
+ (checkpoint, ignoredCleaner, ignoredPostCleanCallback)
-> {
+ throw failure;
+ });
+
+ final AtomicInteger cleanupCallCount = new AtomicInteger(0);
+ final CheckpointCoordinator checkpointCoordinator =
+ new CheckpointCoordinatorBuilder()
+ .setExecutionGraph(graph)
+ .setCheckpointIDCounter(checkpointIDCounter)
+ .setCheckpointsCleaner(
+ new CheckpointsCleaner() {
+
+ private static final long serialVersionUID
=
+ 2029876992397573325L;
+
+ @Override
+ public void cleanCheckpointOnFailedStoring(
+ CompletedCheckpoint
completedCheckpoint,
+ Executor executor) {
+ cleanupCallCount.incrementAndGet();
+ super.cleanCheckpointOnFailedStoring(
+ completedCheckpoint, executor);
+ }
+ })
+ .setCompletedCheckpointStore(completedCheckpointStore)
+ .setTimer(manuallyTriggeredScheduledExecutor)
+ .build();
+
checkpointCoordinator.triggerSavepoint(tmpFolder.newFolder().getAbsolutePath());
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ try {
+ checkpointCoordinator.receiveAcknowledgeMessage(
+ new AcknowledgeCheckpoint(
+ graph.getJobID(), attemptId,
checkpointIDCounter.getLast()),
+ "unknown location");
+ fail("CheckpointException should have been thrown.");
+ } catch (CheckpointException e) {
+ assertThat(
+ e.getCheckpointFailureReason(),
+ is(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE));
+ }
+
+ assertThat(cleanupCallCount.get(), is(expectedCleanupCalls));
+ }
+
private static final class FailingCompletedCheckpointStore implements
CompletedCheckpointStore {
+ private final TriConsumerWithException<
+ CompletedCheckpoint, CheckpointsCleaner, Runnable,
Exception>
+ addCheckpointConsumer;
+
+ public FailingCompletedCheckpointStore(
+ TriConsumerWithException<
+ CompletedCheckpoint, CheckpointsCleaner,
Runnable, Exception>
+ addCheckpointConsumer) {
+ this.addCheckpointConsumer = addCheckpointConsumer;
Review comment:
I'm not sure whether the flexibility we get here with TriConsumer
justifies the complexity added.
Wouldn't a field `private Exception failure;` passed to constructor
sufficicient?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]