pnowojski commented on a change in pull request #11687:
URL: https://github.com/apache/flink/pull/11687#discussion_r411316537
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
##########
@@ -131,7 +135,12 @@
/**
* Setup gate, potentially heavy-weight, blocking operation comparing
to just creation.
*/
- public abstract void setup() throws IOException, InterruptedException;
+ public abstract void setup() throws IOException;
+
+ public abstract void initializeStateAndRequestPartitions(
Review comment:
`readRecoveredStateAndRequestPartitions`?
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -914,7 +914,9 @@ public void testInitializeResultPartitionState() throws
Exception {
MockEnvironment mockEnvironment = new
MockEnvironmentBuilder().build();
mockEnvironment.addOutputs(Arrays.asList(partitions));
- StreamTask task = new
MockStreamTaskBuilder(mockEnvironment).build();
+ StreamConfig config = new StreamConfig(new Configuration());
+ config.setUnalignedCheckpointsEnabled(true);
Review comment:
Do we want this change here? I would like to avoid introducing many
different places that are manually disabling/enabling unaligned checkpoints.
Maybe this should be handled generically as part of this ticket
https://issues.apache.org/jira/browse/FLINK-17258 ?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -162,6 +163,95 @@ void assignExclusiveSegments() throws IOException {
// Consume
//
------------------------------------------------------------------------
+ void readRecoveredState(ChannelStateReader reader) throws IOException,
InterruptedException {
+ beforeReadRecoveredState();
+
+ while (true) {
+ Buffer buffer;
+ synchronized (bufferQueue) {
+ buffer = bufferQueue.takeBuffer();
+ if (buffer == null) {
+ if (isReleased()) {
+ return;
+ }
+ if (!isWaitingForFloatingBuffers) {
+ buffer =
inputGate.getBufferPool().requestBuffer();
+ if (buffer == null) {
+
inputGate.getBufferProvider().addBufferListener(this);
+
isWaitingForFloatingBuffers = true;
+ }
+ }
+ }
+ if (buffer == null) {
+ bufferQueue.wait();
+ continue;
+ }
+ }
+
+ ChannelStateReader.ReadResult result =
internalReaderRecoveredState(reader, buffer);
+ if (result ==
ChannelStateReader.ReadResult.NO_MORE_DATA) {
+ return;
+ }
+ }
+ }
+
+ private void beforeReadRecoveredState() {
Review comment:
`initializeCreditsForRecoveringState`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -222,19 +225,65 @@ public SingleInputGate(
}
@Override
- public void setup() throws IOException, InterruptedException {
+ public void setup() throws IOException {
checkState(this.bufferPool == null, "Bug in input gate setup
logic: Already registered buffer pool.");
// assign exclusive buffers to input channels directly and use
the rest for floating buffers
assignExclusiveSegments();
BufferPool bufferPool = bufferPoolFactory.get();
setBufferPool(bufferPool);
+ }
- requestPartitions();
+ @Override
+ public void initializeStateAndRequestPartitions(
+ boolean hasStates,
+ @Nullable ExecutorService executor,
+ ChannelStateReader reader) throws Exception {
+
+ if (hasStates) {
+ checkNotNull(executor);
+ readRecoveredStateBeforeRequestPartition(executor,
reader);
+ } else {
+ requestPartitions();
+ }
Review comment:
This doesn't look right - boolean `hasStates` and `@Nullable executor` -
it looks like this check should be on a different layer. As this method looks
like it's called only in one place, shouldn't it be just inlined?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -204,6 +205,9 @@
protected final MailboxProcessor mailboxProcessor;
+ @Nullable
+ private final ExecutorService channelStateUnspillingExecutor;
Review comment:
Add a `TODO` that it should be replaced by a global TaskManager
ioExecutor?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]