[
https://issues.apache.org/jira/browse/FLINK-6534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16007798#comment-16007798
]
ASF GitHub Bot commented on FLINK-6534:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/3870#discussion_r116179467
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
---
@@ -18,91 +18,137 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executor;
/**
* A {@code SharedStateRegistry} will be deployed in the
- * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to
+ * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to
* maintain the reference count of {@link SharedStateHandle}s which are
shared
- * among different checkpoints.
- *
+ * among different incremental checkpoints.
*/
public class SharedStateRegistry {
private static final Logger LOG =
LoggerFactory.getLogger(SharedStateRegistry.class);
/** All registered state objects by an artificial key */
- private final Map<String, SharedStateRegistry.SharedStateEntry>
registeredStates;
+ private final Map<SharedStateRegistryKey,
SharedStateRegistry.SharedStateEntry> registeredStates;
+
+ /** Executor for async state deletion */
+ private final Executor asyncDisposalExecutor;
public SharedStateRegistry() {
this.registeredStates = new HashMap<>();
+ this.asyncDisposalExecutor = Executors.directExecutor();
//TODO: FLINK-6534
--- End diff --
I totally agree that there should not be a new executor, that is why I
marked it with the TODO. This is just a preparation for the full fix of
FLINK-6534. My plan for the full fix is to pass the IO executor from the
`CompletedCheckpointStore` and use it inside the registry. This will happen
outside of any synchronization. For now, this code is a working placeholder for
the full fix that I will do as a followup.
> SharedStateRegistry is disposing state handles from main thread
> ---------------------------------------------------------------
>
> Key: FLINK-6534
> URL: https://issues.apache.org/jira/browse/FLINK-6534
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Affects Versions: 1.3.0
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Blocker
> Fix For: 1.3.0
>
>
> Currently, the {{ShareStateRegistry}} is deleting state handles that are no
> longer referenced under the registry's lock and from the main thread. We
> should use the {{CheckpointCoordinator}}'s async IO executor to make this
> non-blocking.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)