pnowojski commented on a change in pull request #17774:
URL: https://github.com/apache/flink/pull/17774#discussion_r766826678
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1084,6 +1083,15 @@ public boolean receiveAcknowledgeMessage(
final PendingCheckpoint checkpoint =
pendingCheckpoints.get(checkpointId);
+ if (message.getSubtaskState() != null) {
+ // Register shared state regardless of checkpoint state and
task ACK state.
+ // This way, shared state is
+ // 1. kept if the message is late or state will be used by the
task otherwise
+ // 2. removed eventually upon checkpoint subsumption (or job
cancellation)
+ message.getSubtaskState()
+
.registerSharedStates(completedCheckpointStore.getRegistry(), checkpointId);
+ }
+
Review comment:
Is this a valid change with the current (from the perspective of this
commit) reference counting method?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
##########
@@ -95,48 +100,36 @@ public Result registerReference(
state,
entry.stateHandle);
}
- entry.increaseReferenceCount();
+ entry.lastUsedCheckpointID = Math.max(checkpointID,
entry.lastUsedCheckpointID);
Review comment:
nitty nit: I would extra this to a method inside `SharedStateEntry` for
the sake of encapsulation.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java
##########
@@ -69,6 +69,11 @@ public SharedStateRegistry getSharedStateRegistry() {
throw unsupportedOperationException();
Review comment:
Is the javadoc of this class incorrect?
```
* This class represents a {@link CompletedCheckpointStore} if checkpointing
has been enabled.
```
Shouldn't it be
```
* This class represents a {@link CompletedCheckpointStore} if checkpointing
has been disabled.
```
?
Note that adding `getRegistry()` the method also brakes that java doc even
further.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -142,6 +145,49 @@
/** Tests for the checkpoint coordinator. */
public class CheckpointCoordinatorTest extends TestLogger {
+ @Test
+ public void testSharedStateNotDiscaredOnAbort() throws Exception {
+ JobVertexID v1 = new JobVertexID(), v2 = new JobVertexID();
+
+ ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(v1)
+ .addJobVertex(v2)
+ .build();
+
+ CheckpointCoordinator coordinator =
+ new CheckpointCoordinatorBuilder()
+ .setExecutionGraph(graph)
+ .setTimer(manuallyTriggeredScheduledExecutor)
+ .build();
+ coordinator.startCheckpointScheduler();
+
+ CompletableFuture<CompletedCheckpoint> cpFuture =
coordinator.triggerCheckpoint(true);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+ cpFuture.getNow(null);
+
+ TestingStreamStateHandle metaState = handle();
+ TestingStreamStateHandle privateState = handle();
+ TestingStreamStateHandle sharedState = handle();
+
+ ack(1L, coordinator, v1, graph, metaState, privateState, sharedState);
+ nack(1L, coordinator, v2, graph);
Review comment:
Can you rename the methods to something more descriptive?
`acknowledgeCheckpoint()` `declineCheckpoint()`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
##########
@@ -113,4 +113,6 @@ default long getLatestCheckpointId() {
/** Returns the {@link SharedStateRegistry} used to register the shared
state. */
SharedStateRegistry getSharedStateRegistry();
+
+ SharedStateRegistry getRegistry();
Review comment:
Why the two? It looks like they are returning the same thing, except of
`DeactivatedCheckpointCompletedCheckpointStore`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java
##########
@@ -69,6 +69,11 @@ public SharedStateRegistry getSharedStateRegistry() {
throw unsupportedOperationException();
}
+ @Override
+ public SharedStateRegistry getRegistry() {
+ return SharedStateRegistry.NO_OP;
+ }
+
Review comment:
Why do we need this `NO_OP` value here? Why not throw an exception like
in the existing
`DeactivatedCheckpointCompletedCheckpointStore#getSharedStateRegistry`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
##########
@@ -95,48 +100,36 @@ public Result registerReference(
state,
entry.stateHandle);
}
- entry.increaseReferenceCount();
+ entry.lastUsedCheckpointID = Math.max(checkpointID,
entry.lastUsedCheckpointID);
}
}
scheduleAsyncDelete(scheduledStateDeletion);
LOG.trace("Registered shared state {} under key {}.", entry,
registrationKey);
- return new Result(entry);
+ return entry.stateHandle;
}
@Override
- public Result unregisterReference(SharedStateRegistryKey registrationKey) {
-
- checkNotNull(registrationKey);
-
- final Result result;
- final StreamStateHandle scheduledStateDeletion;
- SharedStateEntry entry;
-
+ public void unregisterUnusedState(long lowestCheckpointID) {
+ LOG.debug(
+ "Discard state created before checkpoint {} and not used
afterwards",
+ lowestCheckpointID);
+ List<StreamStateHandle> subsumed = new ArrayList<>();
synchronized (registeredStates) {
- entry = registeredStates.get(registrationKey);
-
- checkState(
- entry != null,
- "Cannot unregister a state that is not registered: %s",
- registrationKey);
-
- entry.decreaseReferenceCount();
-
- // Remove the state from the registry when it's not referenced any
more.
- if (entry.getReferenceCount() <= 0) {
- registeredStates.remove(registrationKey);
- scheduledStateDeletion = entry.getStateHandle();
- result = new Result(null, 0);
- } else {
- scheduledStateDeletion = null;
- result = new Result(entry);
+ Iterator<SharedStateEntry> it =
registeredStates.values().iterator();
+ while (it.hasNext()) {
+ SharedStateEntry entry = it.next();
+ if (entry.lastUsedCheckpointID < lowestCheckpointID) {
+ subsumed.add(entry.stateHandle);
+ it.remove();
+ }
Review comment:
Can you add some comment highlighting and justifying that you are
iterating here over all of the state handles?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]