tillrohrmann commented on a change in pull request #18237:
URL: https://github.com/apache/flink/pull/18237#discussion_r777409504
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
##########
@@ -213,28 +218,30 @@ public void releaseLocalStateForAllocationId(@Nonnull
AllocationID allocationID)
}
public void shutdown() {
-
- HashMap<AllocationID, Map<JobVertexSubtaskKey,
OwnedTaskLocalStateStore>> toRelease;
-
synchronized (lock) {
if (closed) {
return;
}
closed = true;
- toRelease = new HashMap<>(taskStateStoresByAllocationID);
taskStateStoresByAllocationID.clear();
}
- ShutdownHookUtil.removeShutdownHook(shutdownHook,
getClass().getSimpleName(), LOG);
-
LOG.info("Shutting down TaskExecutorLocalStateStoresManager.");
- for (Map.Entry<AllocationID, Map<JobVertexSubtaskKey,
OwnedTaskLocalStateStore>> entry :
- toRelease.entrySet()) {
+ ShutdownHookUtil.removeShutdownHook(shutdownHook,
getClass().getSimpleName(), LOG);
- doRelease(entry.getValue().values());
- cleanupAllocationBaseDirs(entry.getKey());
+ if (localStateRootDirectories.isOwned()) {
Review comment:
The `Reference` is not intended to give guarantees that something is no
longer in use. The idea is to convey the notion whether you own it or not and,
thus, are responsible for the clean up. The component handing out these
references is responsible that all components are shut down before cleaning up
the value of the reference.
At the moment I think it is fine as is. If we need ref counting in the
future, we can add it later.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]