AHeise commented on a change in pull request #11948: URL: https://github.com/apache/flink/pull/11948#discussion_r421430784
########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest.java ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; +import org.apache.flink.runtime.event.TaskEvent; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.util.function.ThrowingRunnable; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; + +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT; +import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT; +import static org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +/** + * {@link AlternatingCheckpointBarrierHandler} test. + */ +public class AlternatingCheckpointBarrierHandlerTest { Review comment: I was more concerned that the alternation is currently not working when a barrier is received through notify. In particular, an UC is only triggered when the first barrier is processed vs. received defeating the purpose of UC. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ########## @@ -551,7 +551,7 @@ void onSenderBacklog(int backlog) throws IOException { } public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException { - boolean recycleBuffer = true; + boolean enqueued = false; Review comment: While `enqueued` is more semantic, I'd assume that the technical `recycleBuffer` should be more explicit concerning bugs. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java ########## @@ -291,7 +291,7 @@ public int unsynchronizedGetNumberOfQueuedBuffers() { * returns null in all other cases. */ @Nullable - protected CheckpointBarrier parseCheckpointBarrierOrNull(Buffer buffer) throws IOException { + CheckpointBarrier parseCheckpointBarrierOrNull(Buffer buffer) throws IOException { Review comment: But you are decreasing visibility here... ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java ########## @@ -300,7 +300,7 @@ protected CheckpointBarrier parseCheckpointBarrierOrNull(Buffer buffer) throws I // reset the buffer because it would be deserialized again in SingleInputGate while getting next buffer. // we can further improve to avoid double deserialization in the future. buffer.setReaderIndex(0); - return event.getClass() == CheckpointBarrier.class ? (CheckpointBarrier) event : null; + return event instanceof CheckpointBarrier ? (CheckpointBarrier) event : null; Review comment: Do we even have any subclass here? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java ########## @@ -124,7 +124,7 @@ public SingleInputGate create( bufferDecompressor = new BufferDecompressor(networkBufferSize, compressionCodec); } - SingleInputGate inputGate = new SingleInputGate( Review comment: Are the last three hotfix commits even used in the PR? (Introducing interfaces for gates/channels?) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
