rkhachatryan commented on a change in pull request #11491: 
[FLINK-16513][checkpointing] Unaligned checkpoints: checkpoint metadata
URL: https://github.com/apache/flink/pull/11491#discussion_r400250969
 
 

 ##########
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java
 ##########
 @@ -43,68 +55,72 @@
         * Test that the runnable futures are executed and the result is 
correctly extracted.
         */
        @Test
-       public void testRunAndExtract() throws Exception{
+       public void testRunAndExtract() throws Exception {
 
                Random random = new Random(0x42);
 
-               KeyedStateHandle keyedTemplate =
-                       StateHandleDummyUtil.createNewKeyedStateHandle(new 
KeyGroupRange(0, 0));
-               OperatorStateHandle operatorTemplate =
-                       StateHandleDummyUtil.createNewOperatorStateHandle(2, 
random);
-
-               SnapshotResult<KeyedStateHandle> snapKeyMan = 
SnapshotResult.withLocalState(
-                       StateHandleDummyUtil.deepDummyCopy(keyedTemplate),
-                       StateHandleDummyUtil.deepDummyCopy(keyedTemplate));
-
-               SnapshotResult<KeyedStateHandle> snapKeyRaw = 
SnapshotResult.withLocalState(
-                       StateHandleDummyUtil.deepDummyCopy(keyedTemplate),
-                       StateHandleDummyUtil.deepDummyCopy(keyedTemplate));
-
-               SnapshotResult<OperatorStateHandle> snapOpMan = 
SnapshotResult.withLocalState(
-                       StateHandleDummyUtil.deepDummyCopy(operatorTemplate),
-                       StateHandleDummyUtil.deepDummyCopy(operatorTemplate));
-
-               SnapshotResult<OperatorStateHandle> snapOpRaw = 
SnapshotResult.withLocalState(
-                       StateHandleDummyUtil.deepDummyCopy(operatorTemplate),
-                       StateHandleDummyUtil.deepDummyCopy(operatorTemplate));
-
-               DoneFuture<SnapshotResult<KeyedStateHandle>> managedKeyed = new 
PseudoNotDoneFuture<>(snapKeyMan);
-               DoneFuture<SnapshotResult<KeyedStateHandle>> rawKeyed = new 
PseudoNotDoneFuture<>(snapKeyRaw);
-               DoneFuture<SnapshotResult<OperatorStateHandle>> managedOp = new 
PseudoNotDoneFuture<>(snapOpMan);
-               DoneFuture<SnapshotResult<OperatorStateHandle>> rawOp = new 
PseudoNotDoneFuture<>(snapOpRaw);
-
-               Assert.assertFalse(managedKeyed.isDone());
-               Assert.assertFalse(rawKeyed.isDone());
-               Assert.assertFalse(managedOp.isDone());
-               Assert.assertFalse(rawOp.isDone());
-
-               OperatorSnapshotFutures futures = new 
OperatorSnapshotFutures(managedKeyed, rawKeyed, managedOp, rawOp);
-               OperatorSnapshotFinalizer operatorSnapshotFinalizer = new 
OperatorSnapshotFinalizer(futures);
-
-               Assert.assertTrue(managedKeyed.isDone());
-               Assert.assertTrue(rawKeyed.isDone());
-               Assert.assertTrue(managedOp.isDone());
-               Assert.assertTrue(rawOp.isDone());
-
-               OperatorSubtaskState jobManagerOwnedState = 
operatorSnapshotFinalizer.getJobManagerOwnedState();
-               
Assert.assertTrue(checkResult(snapKeyMan.getJobManagerOwnedSnapshot(), 
jobManagerOwnedState.getManagedKeyedState()));
-               
Assert.assertTrue(checkResult(snapKeyRaw.getJobManagerOwnedSnapshot(), 
jobManagerOwnedState.getRawKeyedState()));
-               
Assert.assertTrue(checkResult(snapOpMan.getJobManagerOwnedSnapshot(), 
jobManagerOwnedState.getManagedOperatorState()));
-               
Assert.assertTrue(checkResult(snapOpRaw.getJobManagerOwnedSnapshot(), 
jobManagerOwnedState.getRawOperatorState()));
-
-               OperatorSubtaskState taskLocalState = 
operatorSnapshotFinalizer.getTaskLocalState();
-               
Assert.assertTrue(checkResult(snapKeyMan.getTaskLocalSnapshot(), 
taskLocalState.getManagedKeyedState()));
-               
Assert.assertTrue(checkResult(snapKeyRaw.getTaskLocalSnapshot(), 
taskLocalState.getRawKeyedState()));
-               Assert.assertTrue(checkResult(snapOpMan.getTaskLocalSnapshot(), 
taskLocalState.getManagedOperatorState()));
-               Assert.assertTrue(checkResult(snapOpRaw.getTaskLocalSnapshot(), 
taskLocalState.getRawOperatorState()));
+               KeyedStateHandle keyedTemplate = 
StateHandleDummyUtil.createNewKeyedStateHandle(new KeyGroupRange(0, 0));
+               OperatorStateHandle operatorTemplate = 
StateHandleDummyUtil.createNewOperatorStateHandle(2, random);
+               InputChannelStateHandle inputChannelTemplate = 
StateHandleDummyUtil.createNewInputChannelStateHandle(2, random);
+               ResultSubpartitionStateHandle resultSubpartitionTemplate = 
StateHandleDummyUtil.createNewResultSubpartitionStateHandle(2, random);
+
+               SnapshotResult<KeyedStateHandle> manKeyed = 
withLocalState(deepDummyCopy(keyedTemplate), deepDummyCopy(keyedTemplate));
+               SnapshotResult<KeyedStateHandle> rawKeyed = 
withLocalState(deepDummyCopy(keyedTemplate), deepDummyCopy(keyedTemplate));
+               SnapshotResult<OperatorStateHandle> manOper = 
withLocalState(deepDummyCopy(operatorTemplate), 
deepDummyCopy(operatorTemplate));
+               SnapshotResult<OperatorStateHandle> rawOper = 
withLocalState(deepDummyCopy(operatorTemplate), 
deepDummyCopy(operatorTemplate));
+               SnapshotResult<StateObjectCollection<InputChannelStateHandle>> 
inputChannel = withLocalState(
 
 Review comment:
   Local recovery is not a priority. But I added it in places where local and 
non-local state is managed similarly so in future it would be easier to add 
support for it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to