rkhachatryan commented on a change in pull request #11491: [FLINK-16513][checkpointing] Unaligned checkpoints: checkpoint metadata URL: https://github.com/apache/flink/pull/11491#discussion_r400250969
########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java ########## @@ -43,68 +55,72 @@ * Test that the runnable futures are executed and the result is correctly extracted. */ @Test - public void testRunAndExtract() throws Exception{ + public void testRunAndExtract() throws Exception { Random random = new Random(0x42); - KeyedStateHandle keyedTemplate = - StateHandleDummyUtil.createNewKeyedStateHandle(new KeyGroupRange(0, 0)); - OperatorStateHandle operatorTemplate = - StateHandleDummyUtil.createNewOperatorStateHandle(2, random); - - SnapshotResult<KeyedStateHandle> snapKeyMan = SnapshotResult.withLocalState( - StateHandleDummyUtil.deepDummyCopy(keyedTemplate), - StateHandleDummyUtil.deepDummyCopy(keyedTemplate)); - - SnapshotResult<KeyedStateHandle> snapKeyRaw = SnapshotResult.withLocalState( - StateHandleDummyUtil.deepDummyCopy(keyedTemplate), - StateHandleDummyUtil.deepDummyCopy(keyedTemplate)); - - SnapshotResult<OperatorStateHandle> snapOpMan = SnapshotResult.withLocalState( - StateHandleDummyUtil.deepDummyCopy(operatorTemplate), - StateHandleDummyUtil.deepDummyCopy(operatorTemplate)); - - SnapshotResult<OperatorStateHandle> snapOpRaw = SnapshotResult.withLocalState( - StateHandleDummyUtil.deepDummyCopy(operatorTemplate), - StateHandleDummyUtil.deepDummyCopy(operatorTemplate)); - - DoneFuture<SnapshotResult<KeyedStateHandle>> managedKeyed = new PseudoNotDoneFuture<>(snapKeyMan); - DoneFuture<SnapshotResult<KeyedStateHandle>> rawKeyed = new PseudoNotDoneFuture<>(snapKeyRaw); - DoneFuture<SnapshotResult<OperatorStateHandle>> managedOp = new PseudoNotDoneFuture<>(snapOpMan); - DoneFuture<SnapshotResult<OperatorStateHandle>> rawOp = new PseudoNotDoneFuture<>(snapOpRaw); - - Assert.assertFalse(managedKeyed.isDone()); - Assert.assertFalse(rawKeyed.isDone()); - Assert.assertFalse(managedOp.isDone()); - Assert.assertFalse(rawOp.isDone()); - - OperatorSnapshotFutures futures = new OperatorSnapshotFutures(managedKeyed, rawKeyed, managedOp, rawOp); - OperatorSnapshotFinalizer operatorSnapshotFinalizer = new OperatorSnapshotFinalizer(futures); - - Assert.assertTrue(managedKeyed.isDone()); - Assert.assertTrue(rawKeyed.isDone()); - Assert.assertTrue(managedOp.isDone()); - Assert.assertTrue(rawOp.isDone()); - - OperatorSubtaskState jobManagerOwnedState = operatorSnapshotFinalizer.getJobManagerOwnedState(); - Assert.assertTrue(checkResult(snapKeyMan.getJobManagerOwnedSnapshot(), jobManagerOwnedState.getManagedKeyedState())); - Assert.assertTrue(checkResult(snapKeyRaw.getJobManagerOwnedSnapshot(), jobManagerOwnedState.getRawKeyedState())); - Assert.assertTrue(checkResult(snapOpMan.getJobManagerOwnedSnapshot(), jobManagerOwnedState.getManagedOperatorState())); - Assert.assertTrue(checkResult(snapOpRaw.getJobManagerOwnedSnapshot(), jobManagerOwnedState.getRawOperatorState())); - - OperatorSubtaskState taskLocalState = operatorSnapshotFinalizer.getTaskLocalState(); - Assert.assertTrue(checkResult(snapKeyMan.getTaskLocalSnapshot(), taskLocalState.getManagedKeyedState())); - Assert.assertTrue(checkResult(snapKeyRaw.getTaskLocalSnapshot(), taskLocalState.getRawKeyedState())); - Assert.assertTrue(checkResult(snapOpMan.getTaskLocalSnapshot(), taskLocalState.getManagedOperatorState())); - Assert.assertTrue(checkResult(snapOpRaw.getTaskLocalSnapshot(), taskLocalState.getRawOperatorState())); + KeyedStateHandle keyedTemplate = StateHandleDummyUtil.createNewKeyedStateHandle(new KeyGroupRange(0, 0)); + OperatorStateHandle operatorTemplate = StateHandleDummyUtil.createNewOperatorStateHandle(2, random); + InputChannelStateHandle inputChannelTemplate = StateHandleDummyUtil.createNewInputChannelStateHandle(2, random); + ResultSubpartitionStateHandle resultSubpartitionTemplate = StateHandleDummyUtil.createNewResultSubpartitionStateHandle(2, random); + + SnapshotResult<KeyedStateHandle> manKeyed = withLocalState(deepDummyCopy(keyedTemplate), deepDummyCopy(keyedTemplate)); + SnapshotResult<KeyedStateHandle> rawKeyed = withLocalState(deepDummyCopy(keyedTemplate), deepDummyCopy(keyedTemplate)); + SnapshotResult<OperatorStateHandle> manOper = withLocalState(deepDummyCopy(operatorTemplate), deepDummyCopy(operatorTemplate)); + SnapshotResult<OperatorStateHandle> rawOper = withLocalState(deepDummyCopy(operatorTemplate), deepDummyCopy(operatorTemplate)); + SnapshotResult<StateObjectCollection<InputChannelStateHandle>> inputChannel = withLocalState( Review comment: Local recovery is not a priority. But I added it in places where local and non-local state is managed similarly so in future it would be easier to add support for it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services