AHeise commented on a change in pull request #12493:
URL: https://github.com/apache/flink/pull/12493#discussion_r435865119



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
##########
@@ -110,6 +118,69 @@ public void testNoDataProcessedAfterCheckpointBarrier() 
throws Exception {
                assertEquals(0, output.getNumberOfEmittedRecords());
        }
 
+       @Test
+       public void testSnapshotWithTwoInputGates() throws Exception {
+               CheckpointBarrierUnaligner unaligner = new 
CheckpointBarrierUnaligner(
+                               new int[]{ 1, 1 },
+                               ChannelStateWriter.NO_OP,
+                               "test",
+                               new DummyCheckpointInvokable());
+
+               SingleInputGate inputGate1 = new 
SingleInputGateBuilder().setSingleInputGateIndex(0).build();
+               RemoteInputChannel channel1 = 
InputChannelBuilder.newBuilder().buildRemoteChannel(inputGate1);
+               inputGate1.setInputChannels(channel1);
+               
inputGate1.registerBufferReceivedListener(unaligner.getBufferReceivedListener().get());
+               StreamTaskNetworkInput<Long> input1 = createInput(unaligner, 
inputGate1);
+
+               SingleInputGate inputGate2 = new 
SingleInputGateBuilder().setSingleInputGateIndex(1).build();
+               RemoteInputChannel channel2 = 
InputChannelBuilder.newBuilder().buildRemoteChannel(inputGate2);
+               inputGate2.setInputChannels(channel2);
+               
inputGate2.registerBufferReceivedListener(unaligner.getBufferReceivedListener().get());
+               StreamTaskNetworkInput<Long> input2 = createInput(unaligner, 
inputGate2);
+
+               CheckpointBarrier barrier = new CheckpointBarrier(0, 0L, 
CheckpointOptions.forCheckpointWithDefaultLocation());
+               channel1.onBuffer(EventSerializer.toBuffer(barrier), 0, 0);
+               channel1.onBuffer(BufferBuilderTestUtils.buildSomeBuffer(1), 1, 
0);
+
+               // all records on inputGate2 are now in-flight
+               channel2.onBuffer(BufferBuilderTestUtils.buildSomeBuffer(2), 0, 
0);
+               channel2.onBuffer(BufferBuilderTestUtils.buildSomeBuffer(3), 1, 
0);
+
+               // now snapshot all inflight buffers
+               RecordingChannelStateWriter channelStateWriter = new 
RecordingChannelStateWriter();
+               channelStateWriter.start(0, 
CheckpointOptions.forCheckpointWithDefaultLocation());
+               CompletableFuture<Void> completableFuture1 = 
input1.prepareSnapshot(channelStateWriter, 0);
+               CompletableFuture<Void> completableFuture2 = 
input2.prepareSnapshot(channelStateWriter, 0);
+
+               // finish unaligned checkpoint on input side
+               channel2.onBuffer(EventSerializer.toBuffer(barrier), 2, 0);
+
+               // futures should be completed
+               completableFuture1.join();
+               completableFuture2.join();
+
+               
assertEquals(channelStateWriter.getAddedInput().get(channel1.getChannelInfo()), 
Collections.emptyList());
+               assertEquals(Arrays.asList(2, 3),

Review comment:
       Ended up with splitting the line with an additional assignment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to