This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c9773f411e7 [FLINK-38216][checkpoint][refactor] Split
EndOfChannelStateEvent into EndOfInputChannelStateEvent and
EndOfOutputChannelStateEvent
c9773f411e7 is described below
commit c9773f411e7c8aab01f875877820a788a079828b
Author: Rui Fan <[email protected]>
AuthorDate: Wed Aug 13 16:13:22 2025 +0200
[FLINK-38216][checkpoint][refactor] Split EndOfChannelStateEvent into
EndOfInputChannelStateEvent and EndOfOutputChannelStateEvent
---
.../io/network/api/serialization/EventSerializer.java | 19 +++++++++++++------
.../flink/runtime/io/network/buffer/Buffer.java | 4 ++--
.../network/partition/PipelinedResultPartition.java | 4 ++--
...ateEvent.java => EndOfInputChannelStateEvent.java} | 16 ++++++----------
...teEvent.java => EndOfOutputChannelStateEvent.java} | 10 +++++-----
.../partition/consumer/RecoveredInputChannel.java | 9 +++++----
.../runtime/io/AbstractStreamTaskNetworkInput.java | 4 ++--
.../io/checkpointing/CheckpointedInputGate.java | 4 ++--
.../api/serialization/EventSerializerTest.java | 10 ++++++++++
.../io/checkpointing/CheckpointedInputGateTest.java | 8 ++++----
.../runtime/io/StreamTaskNetworkInputTest.java | 6 +++---
11 files changed, 54 insertions(+), 40 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index f30e8e23a5a..e73d9168cb2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -43,7 +43,8 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
-import
org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import
org.apache.flink.runtime.io.network.partition.consumer.EndOfInputChannelStateEvent;
+import
org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.util.InstantiationUtil;
@@ -70,7 +71,7 @@ public class EventSerializer {
private static final int CANCEL_CHECKPOINT_MARKER_EVENT = 4;
- private static final int END_OF_CHANNEL_STATE_EVENT = 5;
+ private static final int END_OF_OUTPUT_CHANNEL_STATE_EVENT = 5;
private static final int ANNOUNCEMENT_EVENT = 6;
@@ -84,6 +85,8 @@ public class EventSerializer {
private static final int GENERALIZED_WATERMARK_EVENT = 11;
+ private static final int END_OF_INPUT_CHANNEL_STATE_EVENT = 12;
+
private static final byte CHECKPOINT_TYPE_CHECKPOINT = 0;
private static final byte CHECKPOINT_TYPE_SAVEPOINT = 1;
@@ -109,8 +112,10 @@ public class EventSerializer {
return serializeCheckpointBarrier((CheckpointBarrier) event);
} else if (eventClass == EndOfSuperstepEvent.class) {
return ByteBuffer.wrap(new byte[] {0, 0, 0,
END_OF_SUPERSTEP_EVENT});
- } else if (eventClass == EndOfChannelStateEvent.class) {
- return ByteBuffer.wrap(new byte[] {0, 0, 0,
END_OF_CHANNEL_STATE_EVENT});
+ } else if (eventClass == EndOfOutputChannelStateEvent.class) {
+ return ByteBuffer.wrap(new byte[] {0, 0, 0,
END_OF_OUTPUT_CHANNEL_STATE_EVENT});
+ } else if (eventClass == EndOfInputChannelStateEvent.class) {
+ return ByteBuffer.wrap(new byte[] {0, 0, 0,
END_OF_INPUT_CHANNEL_STATE_EVENT});
} else if (eventClass == EndOfData.class) {
return ByteBuffer.wrap(
new byte[] {
@@ -197,8 +202,10 @@ public class EventSerializer {
return deserializeCheckpointBarrier(buffer);
} else if (type == END_OF_SUPERSTEP_EVENT) {
return EndOfSuperstepEvent.INSTANCE;
- } else if (type == END_OF_CHANNEL_STATE_EVENT) {
- return EndOfChannelStateEvent.INSTANCE;
+ } else if (type == END_OF_OUTPUT_CHANNEL_STATE_EVENT) {
+ return EndOfOutputChannelStateEvent.INSTANCE;
+ } else if (type == END_OF_INPUT_CHANNEL_STATE_EVENT) {
+ return EndOfInputChannelStateEvent.INSTANCE;
} else if (type == END_OF_USER_RECORDS_EVENT) {
return new EndOfData(StopMode.values()[buffer.get()]);
} else if (type == CANCEL_CHECKPOINT_MARKER_EVENT) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
index 6c9ae404200..38c06411b38 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import
org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import
org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
@@ -408,7 +408,7 @@ public interface Buffer {
public static DataType getDataType(AbstractEvent event, boolean
hasPriority) {
if (hasPriority) {
return PRIORITIZED_EVENT_BUFFER;
- } else if (event instanceof EndOfChannelStateEvent) {
+ } else if (event instanceof EndOfOutputChannelStateEvent) {
return RECOVERY_COMPLETION;
} else if (event instanceof EndOfData) {
return END_OF_DATA;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
index 268f34c9b13..0060bf7b35e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
@@ -28,7 +28,7 @@ import
org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
-import
org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import
org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
import org.apache.flink.util.function.SupplierWithException;
import javax.annotation.Nullable;
@@ -262,7 +262,7 @@ public class PipelinedResultPartition extends
BufferWritingResultPartition
return;
}
try (BufferConsumer eventBufferConsumer =
-
EventSerializer.toBufferConsumer(EndOfChannelStateEvent.INSTANCE, false)) {
+
EventSerializer.toBufferConsumer(EndOfOutputChannelStateEvent.INSTANCE, false))
{
for (int i = 0; i < subpartitions.length; i++) {
if (((PipelinedSubpartition)
subpartitions[i]).isSupportChannelStateRecover()) {
addToSubpartition(i, eventBufferConsumer.copy(), 0);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfChannelStateEvent.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfInputChannelStateEvent.java
similarity index 77%
copy from
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfChannelStateEvent.java
copy to
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfInputChannelStateEvent.java
index ea0f71aa17b..e9f39f71736 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfChannelStateEvent.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfInputChannelStateEvent.java
@@ -22,20 +22,16 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.event.RuntimeEvent;
-/**
- * Marks the end of recovered state of {@link RecoveredInputChannel} of this
subtask or {@link
- * org.apache.flink.runtime.io.network.partition.ResultSubpartition
ResultSubpartition} on the
- * upstream.
- */
-public class EndOfChannelStateEvent extends RuntimeEvent {
+/** Marks the end of recovered state of {@link RecoveredInputChannel} of this
subtask. */
+public class EndOfInputChannelStateEvent extends RuntimeEvent {
/** The singleton instance of this event. */
- public static final EndOfChannelStateEvent INSTANCE = new
EndOfChannelStateEvent();
+ public static final EndOfInputChannelStateEvent INSTANCE = new
EndOfInputChannelStateEvent();
// ------------------------------------------------------------------------
// not instantiable
- private EndOfChannelStateEvent() {}
+ private EndOfInputChannelStateEvent() {}
// ------------------------------------------------------------------------
@@ -53,12 +49,12 @@ public class EndOfChannelStateEvent extends RuntimeEvent {
@Override
public int hashCode() {
- return 1965146670;
+ return 20250813;
}
@Override
public boolean equals(Object obj) {
- return obj != null && obj.getClass() == EndOfChannelStateEvent.class;
+ return obj != null && obj.getClass() ==
EndOfInputChannelStateEvent.class;
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfChannelStateEvent.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfOutputChannelStateEvent.java
similarity index 83%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfChannelStateEvent.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfOutputChannelStateEvent.java
index ea0f71aa17b..fd299e24f37 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfChannelStateEvent.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfOutputChannelStateEvent.java
@@ -23,19 +23,19 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.event.RuntimeEvent;
/**
- * Marks the end of recovered state of {@link RecoveredInputChannel} of this
subtask or {@link
+ * Marks the end of recovered state of {@link
* org.apache.flink.runtime.io.network.partition.ResultSubpartition
ResultSubpartition} on the
* upstream.
*/
-public class EndOfChannelStateEvent extends RuntimeEvent {
+public class EndOfOutputChannelStateEvent extends RuntimeEvent {
/** The singleton instance of this event. */
- public static final EndOfChannelStateEvent INSTANCE = new
EndOfChannelStateEvent();
+ public static final EndOfOutputChannelStateEvent INSTANCE = new
EndOfOutputChannelStateEvent();
// ------------------------------------------------------------------------
// not instantiable
- private EndOfChannelStateEvent() {}
+ private EndOfOutputChannelStateEvent() {}
// ------------------------------------------------------------------------
@@ -58,7 +58,7 @@ public class EndOfChannelStateEvent extends RuntimeEvent {
@Override
public boolean equals(Object obj) {
- return obj != null && obj.getClass() == EndOfChannelStateEvent.class;
+ return obj != null && obj.getClass() ==
EndOfOutputChannelStateEvent.class;
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
index 1f41a099931..1d31a65f49a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
@@ -154,7 +154,8 @@ public abstract class RecoveredInputChannel extends
InputChannel implements Chan
}
public void finishReadRecoveredState() throws IOException {
-
onRecoveredStateBuffer(EventSerializer.toBuffer(EndOfChannelStateEvent.INSTANCE,
false));
+ onRecoveredStateBuffer(
+ EventSerializer.toBuffer(EndOfInputChannelStateEvent.INSTANCE,
false));
bufferManager.releaseFloatingBuffers();
LOG.debug("{}/{} finished recovering input.",
inputGate.getOwningTaskName(), channelInfo);
}
@@ -172,7 +173,7 @@ public abstract class RecoveredInputChannel extends
InputChannel implements Chan
if (next == null) {
return null;
- } else if (isEndOfChannelStateEvent(next)) {
+ } else if (isEndOfInputChannelStateEvent(next)) {
stateConsumedFuture.complete(null);
return null;
} else {
@@ -180,14 +181,14 @@ public abstract class RecoveredInputChannel extends
InputChannel implements Chan
}
}
- private boolean isEndOfChannelStateEvent(Buffer buffer) throws IOException
{
+ private boolean isEndOfInputChannelStateEvent(Buffer buffer) throws
IOException {
if (buffer.isBuffer()) {
return false;
}
AbstractEvent event = EventSerializer.fromBuffer(buffer,
getClass().getClassLoader());
buffer.setReaderIndex(0);
- return event.getClass() == EndOfChannelStateEvent.class;
+ return event.getClass() == EndOfInputChannelStateEvent.class;
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
index c076f5bf433..46be9e6b585 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import
org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import
org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
@@ -270,7 +270,7 @@ public abstract class AbstractStreamTaskNetworkInput<
if (checkpointedInputGate.isFinished()) {
return DataInputStatus.END_OF_INPUT;
}
- } else if (event.getClass() == EndOfChannelStateEvent.class) {
+ } else if (event.getClass() == EndOfOutputChannelStateEvent.class) {
if (checkpointedInputGate.allChannelsRecovered()) {
return DataInputStatus.END_OF_RECOVERY;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGate.java
index f3075d3a36d..ad0dcb5b0bb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGate.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGate.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EventAnnouncement;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import
org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import
org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput;
@@ -200,7 +200,7 @@ public class CheckpointedInputGate implements
PullingAsyncDataInput<BufferOrEven
announcedBarrier,
eventAnnouncement.getSequenceNumber(),
bufferOrEvent.getChannelInfo());
- } else if (bufferOrEvent.getEvent().getClass() ==
EndOfChannelStateEvent.class) {
+ } else if (bufferOrEvent.getEvent().getClass() ==
EndOfOutputChannelStateEvent.class) {
upstreamRecoveryTracker.handleEndOfRecovery(bufferOrEvent.getChannelInfo());
}
return Optional.of(bufferOrEvent);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index ab98e82034e..25a93b379d5 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -38,6 +38,8 @@ import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.SubtaskConnectionDescriptor;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import
org.apache.flink.runtime.io.network.partition.consumer.EndOfInputChannelStateEvent;
+import
org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -120,6 +122,9 @@ class EventSerializerTest {
new RecoveryMetadata(3),
new WatermarkEvent(new LongWatermark(42L, "test"), false),
new WatermarkEvent(new BoolWatermark(true, "test"), true),
+ new WatermarkEvent(new BoolWatermark(true, "test"), true),
+ EndOfInputChannelStateEvent.INSTANCE,
+ EndOfOutputChannelStateEvent.INSTANCE,
};
@Test
@@ -161,6 +166,9 @@ class EventSerializerTest {
assertThat(bufferConsumer.build().getDataType())
.isEqualTo(Buffer.DataType.UNALIGNED_WATERMARK_EVENT);
}
+ } else if (evt instanceof EndOfOutputChannelStateEvent) {
+ assertThat(bufferConsumer.build().getDataType())
+ .isEqualTo(Buffer.DataType.RECOVERY_COMPLETION);
} else {
assertThat(bufferConsumer.build().getDataType())
.isEqualTo(Buffer.DataType.EVENT_BUFFER);
@@ -191,6 +199,8 @@ class EventSerializerTest {
assertThat(buffer.getDataType())
.isEqualTo(Buffer.DataType.UNALIGNED_WATERMARK_EVENT);
}
+ } else if (evt instanceof EndOfOutputChannelStateEvent) {
+
assertThat(buffer.getDataType()).isEqualTo(Buffer.DataType.RECOVERY_COMPLETION);
} else {
assertThat(buffer.getDataType()).isEqualTo(Buffer.DataType.EVENT_BUFFER);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGateTest.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGateTest.java
index 0ddaf07721c..bb60d3bf78f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGateTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGateTest.java
@@ -37,7 +37,7 @@ import
org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import
org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import
org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
import
org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -86,7 +86,7 @@ class CheckpointedInputGateTest {
enqueueEndOfState(gate, channelIndex);
Optional<BufferOrEvent> bufferOrEvent = gate.pollNext();
while (bufferOrEvent.isPresent()
- && bufferOrEvent.get().getEvent() instanceof
EndOfChannelStateEvent
+ && bufferOrEvent.get().getEvent() instanceof
EndOfOutputChannelStateEvent
&& !gate.allChannelsRecovered()) {
bufferOrEvent = gate.pollNext();
}
@@ -97,7 +97,7 @@ class CheckpointedInputGateTest {
Optional<BufferOrEvent> polled = gate.pollNext();
assertThat(polled).isPresent();
assertThat(polled.get().isEvent()).isTrue();
-
assertThat(polled.get().getEvent()).isEqualTo(EndOfChannelStateEvent.INSTANCE);
+
assertThat(polled.get().getEvent()).isEqualTo(EndOfOutputChannelStateEvent.INSTANCE);
assertThat(resumeCounter.getNumResumed()).isEqualTo(numberOfChannels);
assertThat(gate.pollNext())
.as("should only be a single event no matter of what is
the number of channels")
@@ -282,7 +282,7 @@ class CheckpointedInputGateTest {
private void enqueueEndOfState(CheckpointedInputGate
checkpointedInputGate, int channelIndex)
throws IOException {
- enqueue(checkpointedInputGate, channelIndex,
EndOfChannelStateEvent.INSTANCE);
+ enqueue(checkpointedInputGate, channelIndex,
EndOfOutputChannelStateEvent.INSTANCE);
}
private void enqueueEndOfPartition(
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index f365b3d8df9..c9cc91b90a2 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -36,7 +36,7 @@ import
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import
org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
-import
org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import
org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import
org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
@@ -229,9 +229,9 @@ class StreamTaskNetworkInputTest {
inputGate.sendElement(new StreamRecord<>(42L), 0);
assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
- inputGate.sendEvent(EndOfChannelStateEvent.INSTANCE, 0);
+ inputGate.sendEvent(EndOfOutputChannelStateEvent.INSTANCE, 0);
assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
- inputGate.sendEvent(EndOfChannelStateEvent.INSTANCE, 1);
+ inputGate.sendEvent(EndOfOutputChannelStateEvent.INSTANCE, 1);
assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.END_OF_RECOVERY);
}