pnowojski commented on a change in pull request #17774:
URL: https://github.com/apache/flink/pull/17774#discussion_r766636670



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
##########
@@ -269,65 +143,23 @@ public String toString() {
         }
     }
 
-    /** The result of an attempt to (un)/reference state */
-    public static class Result {
-
-        /** The (un)registered state handle from the request */
-        private final StreamStateHandle reference;
+    SharedStateRegistry NO_OP =
+            new SharedStateRegistry() {
+                @Override
+                public void close() {}

Review comment:
       Can you move adding the `NO_OP` implementation to the commit that starts 
using it?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
##########
@@ -135,126 +58,77 @@ public Result registerReference(
      *     the state handle, or null if the state handle was deleted through 
this request. Returns
      *     null if the registry was previously closed.
      */
-    public Result unregisterReference(SharedStateRegistryKey registrationKey) {
-
-        Preconditions.checkNotNull(registrationKey);
-
-        final Result result;
-        final StreamStateHandle scheduledStateDeletion;
-        SharedStateRegistry.SharedStateEntry entry;
-
-        synchronized (registeredStates) {
-            entry = registeredStates.get(registrationKey);
-
-            Preconditions.checkState(
-                    entry != null,
-                    "Cannot unregister a state that is not registered: %s",
-                    registrationKey);
-
-            entry.decreaseReferenceCount();
-
-            // Remove the state from the registry when it's not referenced any 
more.
-            if (entry.getReferenceCount() <= 0) {
-                registeredStates.remove(registrationKey);
-                scheduledStateDeletion = entry.getStateHandle();
-                result = new Result(null, 0);
-            } else {
-                scheduledStateDeletion = null;
-                result = new Result(entry);
-            }
-        }
-
-        LOG.trace("Unregistered shared state {} under key {}.", entry, 
registrationKey);
-        scheduleAsyncDelete(scheduledStateDeletion);
-        return result;
-    }
+    SharedStateRegistry.Result unregisterReference(SharedStateRegistryKey 
registrationKey);
 
     /**
      * Register given shared states in the registry.
      *
      * @param stateHandles The shared states to register.
      */
-    public void registerAll(Iterable<? extends CompositeStateHandle> 
stateHandles) {
+    void registerAll(Iterable<? extends CompositeStateHandle> stateHandles);
 
-        if (stateHandles == null) {
-            return;
-        }
+    /** The result of an attempt to (un)/reference state */
+    class Result {

Review comment:
       nit: dIff would be easier to read and less confusing if you kept the 
original order (`Result` below `SharedStateEntry`) or re-ordered them in 
separate commit.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
##########
@@ -776,13 +775,10 @@ public void 
testStateRecoveryWithTopologyChange(TestScaleType scaleType) throws
                         new TestCompletedCheckpointStorageLocation());
 
         // set up the coordinator and validate the initial state
-        SharedStateRegistry sharedStateRegistry =
-                
SharedStateRegistry.DEFAULT_FACTORY.create(Executors.directExecutor());
         CheckpointCoordinator coord =
                 new CheckpointCoordinatorBuilder()
                         .setExecutionGraph(newGraph)
-                        .setCompletedCheckpointStore(storeFor(
-                                        sharedStateRegistry, () -> {}, 
completedCheckpoint))
+                        .setCompletedCheckpointStore(storeFor(() -> {}, 
completedCheckpoint))
                         .setTimer(manuallyTriggeredScheduledExecutor)

Review comment:
       I don't get this change?  It looks like some github/rebasing/merge 
conflict issue/artefact? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to