pnowojski commented on code in PR #19380:
URL: https://github.com/apache/flink/pull/19380#discussion_r953608744


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionRequestNotifier.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+
+import java.io.IOException;
+
+/**
+ * When the netty server receives a downstream task's partition request event 
and finds its upstream task doesn't register its partition yet,
+ * the netty server will construct a {@link PartitionRequestNotifier} and 
notify the request when the task deploys itself and
+ * registers its partition to {@link ResultPartitionManager}.
+ */
+public interface PartitionRequestNotifier {
+
+    /**
+     * The creation timestamp of this notifier, it's used to check whether the 
notifier is timeout.
+     *
+     * @return the creation timestamp
+     */
+    long getCreateTimestamp();
+
+    /**
+     * Get the result partition id of the notifier.
+     *
+     * @return the result partition id
+     */
+    ResultPartitionID getResultPartitionId();
+
+    /**
+     * Get the view reader of the notifier.
+     *
+     * @return the view reader
+     */
+    NetworkSequenceViewReader getViewReader();
+
+    /**
+     * Get the input channel id of the notifier.
+     *
+     * @return the input channel id
+     */
+    InputChannelID getReceiverId();
+
+    /**
+     * Notify the pending partition request when the given partition is 
registered.
+     *
+     * @param partition The registered partition.
+     */
+    void notifyPartitionRequest(ResultPartition partition) throws IOException;
+
+    /**
+     * When the partition request notifier is timeout, it needs to notify 
{@link NetworkSequenceViewReader} of the message.
+     */
+    void notifyPartitionRequestTimeout();

Review Comment:
   `notifyPartitionCreated` and `notifyPartitionCreatedTimeout`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionRequestNotifier.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+
+import java.io.IOException;
+
+/**
+ * When the netty server receives a downstream task's partition request event 
and finds its upstream task doesn't register its partition yet,
+ * the netty server will construct a {@link PartitionRequestNotifier} and 
notify the request when the task deploys itself and
+ * registers its partition to {@link ResultPartitionManager}.
+ */
+public interface PartitionRequestNotifier {

Review Comment:
   Shouldn't this be called `PartitionCreationListener`? This is NOT notifying 
about a partition request. But partition request is creating/using this 
"listener", when partition hasn't been yet created, to listener for partition 
creation notification.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/InputRequestNotifierManager.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Manages partition request notifier with input channel id.
+ */
+public class InputRequestNotifierManager {

Review Comment:
   `PartitionCreationListenerManager`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkSequenceViewReader.java:
##########
@@ -40,6 +45,30 @@ void requestSubpartitionView(
             int subPartitionIndex)
             throws IOException;
 
+    /**
+     * The {@link PartitionRequestNotifier} notify and process the downstream 
task's partition request.
+     *
+     * @param partition the result partition
+     * @param subPartitionIndex the sub partition index
+     * @throws IOException the thrown exception
+     */
+    void requestSubpartitionView(ResultPartition partition, int 
subPartitionIndex) throws IOException;

Review Comment:
   Can you remove the old version of this method? Having two 
`requestSubpartitionView` is confusing, especially that the old one seems to be 
only used in tests after your change.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkSequenceViewReader.java:
##########
@@ -40,6 +45,30 @@ void requestSubpartitionView(
             int subPartitionIndex)
             throws IOException;
 
+    /**
+     * The {@link PartitionRequestNotifier} notify and process the downstream 
task's partition request.
+     *
+     * @param partition the result partition
+     * @param subPartitionIndex the sub partition index
+     * @throws IOException the thrown exception
+     */
+    void requestSubpartitionView(ResultPartition partition, int 
subPartitionIndex) throws IOException;

Review Comment:
   maybe rename this one to `notifySubpartitionCreated`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkSequenceViewReader.java:
##########
@@ -40,6 +45,30 @@ void requestSubpartitionView(
             int subPartitionIndex)
             throws IOException;
 
+    /**
+     * The {@link PartitionRequestNotifier} notify and process the downstream 
task's partition request.
+     *
+     * @param partition the result partition
+     * @param subPartitionIndex the sub partition index
+     * @throws IOException the thrown exception
+     */
+    void requestSubpartitionView(ResultPartition partition, int 
subPartitionIndex) throws IOException;
+
+    /**
+     * When the netty server receives the downstream task's partition request 
and the upstream task has registered its partition,
+     * it will process the partition request immediately, otherwise it will 
create a {@link PartitionRequestNotifier} for given {@link ResultPartitionID}
+     * in {@link ResultPartitionManager} and notify the request when the 
upstream task registers its partition.
+     *
+     * @param partitionProvider the result partition provider
+     * @param resultPartitionId the result partition id
+     * @param subPartitionIndex the sub partition index
+     * @throws IOException the thrown exception
+     */
+    void requestSubpartitionViewOrNotify(

Review Comment:
   maybe rename this to something like `requestSubpartitionView`, or 
`requestSubpartitionViewOrRegisterNotifier`?
   
   I think the method above actually deserves to be called `notify` more, as 
that's the above method's main purpose. While here we are requesting 
subpartition or in other words, registering notifier (and only _maybe_ 
triggering the notify).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java:
##########
@@ -480,6 +493,7 @@ public int hashCode() {
         result = 31 * result + networkBufferSize;
         result = 31 * result + partitionRequestInitialBackoff;
         result = 31 * result + partitionRequestMaxBackoff;
+        result = 31 * result + partitionRequestNotifyTimeout.hashCode();

Review Comment:
   you forgot to modify `equals` method as well.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java:
##########
@@ -101,6 +105,55 @@ public void requestSubpartitionView(
         notifyDataAvailable();
     }
 
+    @Override
+    public void requestSubpartitionView(
+            ResultPartition partition,
+            int subPartitionIndex) throws IOException {
+        synchronized (requestLock) {
+            if (subpartitionView == null) {
+                subpartitionView = 
partition.createSubpartitionView(subPartitionIndex, this);
+            } else {
+                throw new IllegalStateException("Subpartition already 
requested");
+            }
+        }
+
+        notifyDataAvailable();
+    }
+
+    @Override
+    public void requestSubpartitionViewOrNotify(
+            ResultPartitionProvider partitionProvider,
+            ResultPartitionID resultPartitionId,
+            int subPartitionIndex) throws IOException {
+        synchronized (requestLock) {
+            if (subpartitionView == null) {
+                if (partitionRequestNotifier == null) {
+                    partitionRequestNotifier = new 
NettyPartitionRequestNotifier(
+                            partitionProvider,
+                            this,
+                            subPartitionIndex,
+                            resultPartitionId);
+                } else {
+                    throw new IllegalStateException("Partition request 
notifier already created");
+                }
+                // The partition provider will create subpartitionView if 
resultPartition is registered,
+                // otherwise it will return null and add a notifier of the 
given result partition id.
+                this.subpartitionView = 
partitionProvider.createSubpartitionViewOrNotify(
+                        resultPartitionId,
+                        subPartitionIndex,
+                        this,
+                        partitionRequestNotifier);
+                if (subpartitionView == null) {
+                    return;
+                }

Review Comment:
   no-op?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java:
##########
@@ -55,6 +68,29 @@ public void testCreateViewForRegisteredPartition() throws 
Exception {
                 partition.getPartitionId(), 0, new 
NoOpBufferAvailablityListener());
     }
 
+    @Test
+    public void testCreateViewNotifierAfterRegisteredPartition() throws 
Exception {
+        final ResultPartitionManager partitionManager = new 
ResultPartitionManager();
+        final ResultPartition partition = createPartition();
+
+        partitionManager.registerResultPartition(partition);
+        PartitionRequestNotifier partitionRequestNotifier = new 
TestingPartitionRequestNotifier();
+        assertNotNull(partitionManager.createSubpartitionViewOrNotify(
+                partition.getPartitionId(), 0, new 
NoOpBufferAvailablityListener(), partitionRequestNotifier));
+    }
+
+    @Test
+    public void testCreateViewNotifierBeforeRegisteredPartition() throws 
Exception {
+        final ResultPartitionManager partitionManager = new 
ResultPartitionManager();
+        final ResultPartition partition = createPartition();
+
+        PartitionRequestNotifier partitionRequestNotifier = new 
TestingPartitionRequestNotifier();
+        assertNull(partitionManager.createSubpartitionViewOrNotify(
+                partition.getPartitionId(), 0, new 
NoOpBufferAvailablityListener(), partitionRequestNotifier));
+

Review Comment:
   instead of `assertNull`, you should:
   1. check that list of registered listeners/notifiers in the 
`ResultPartitionManager` is NOT empty
   2. assert that `partitionRequestNotifier` has NOT been notified



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java:
##########
@@ -79,8 +128,47 @@ public ResultSubpartitionView createSubpartitionView(
         return subpartitionView;
     }
 
+    @Override
+    public ResultSubpartitionView createSubpartitionViewOrNotify(

Review Comment:
   rename to something like 
`registerPartitionRequestListener`,`registerPartitionRequestNofitifer`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkSequenceViewReader.java:
##########
@@ -40,6 +45,30 @@ void requestSubpartitionView(
             int subPartitionIndex)
             throws IOException;
 
+    /**
+     * The {@link PartitionRequestNotifier} notify and process the downstream 
task's partition request.
+     *
+     * @param partition the result partition
+     * @param subPartitionIndex the sub partition index
+     * @throws IOException the thrown exception
+     */

Review Comment:
   Can you explain in the javadoc how this method and 
`requestSubpartitionViewOrNotify` should be used?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java:
##########
@@ -51,6 +89,17 @@ public void registerResultPartition(ResultPartition 
partition) {
                 throw new IllegalStateException("Result partition already 
registered.");
             }
 
+            InputRequestNotifierManager notifiers = 
requestPartitionNotifiers.remove(partition.getPartitionId());
+            if (notifiers != null) {
+                for (PartitionRequestNotifier notifier : 
notifiers.getPartitionRequestNotifiers()) {
+                    try {
+                        notifier.notifyPartitionRequest(partition);

Review Comment:
   rename to `notifier.notifyPartitionCreated(partition)`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java:
##########
@@ -88,10 +88,8 @@ protected void channelRead0(ChannelHandlerContext ctx, 
NettyMessage msg) throws
                             new CreditBasedSequenceNumberingViewReader(
                                     request.receiverId, request.credit, 
outboundQueue);
 
-                    reader.requestSubpartitionView(
+                    reader.requestSubpartitionViewOrNotify(
                             partitionProvider, request.partitionId, 
request.queueIndex);
-
-                    outboundQueue.notifyReaderCreated(reader);

Review Comment:
   What has happened with this call? I don't see it being replaced by anything 
else? Are you updating `PartitionRequestQueue#allReaders` somewhere else?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java:
##########
@@ -104,12 +192,40 @@ public void shutdown() {
 
             registeredPartitions.clear();
 
+            requestPartitionNotifiers.clear();

Review Comment:
   I think that's a good point



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java:
##########
@@ -101,6 +105,55 @@ public void requestSubpartitionView(
         notifyDataAvailable();
     }
 
+    @Override
+    public void requestSubpartitionView(
+            ResultPartition partition,
+            int subPartitionIndex) throws IOException {
+        synchronized (requestLock) {
+            if (subpartitionView == null) {

Review Comment:
   nit: you can replace this if check and many similar checks in other places, 
with a one liners like: `checkState(subpartitionView == null, "Subpartition 
already requested")`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java:
##########
@@ -55,6 +68,29 @@ public void testCreateViewForRegisteredPartition() throws 
Exception {
                 partition.getPartitionId(), 0, new 
NoOpBufferAvailablityListener());
     }
 
+    @Test
+    public void testCreateViewNotifierAfterRegisteredPartition() throws 
Exception {
+        final ResultPartitionManager partitionManager = new 
ResultPartitionManager();
+        final ResultPartition partition = createPartition();
+
+        partitionManager.registerResultPartition(partition);
+        PartitionRequestNotifier partitionRequestNotifier = new 
TestingPartitionRequestNotifier();
+        assertNotNull(partitionManager.createSubpartitionViewOrNotify(
+                partition.getPartitionId(), 0, new 
NoOpBufferAvailablityListener(), partitionRequestNotifier));

Review Comment:
   instead of `assertNotNull`, you should:
   1. check that list of registered listeners/notifiers in the 
`ResultPartitionManager` is empty
   2. assert that `partitionRequestNotifier` has been notified with correct 
partition and subpartition id.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java:
##########
@@ -55,6 +68,29 @@ public void testCreateViewForRegisteredPartition() throws 
Exception {
                 partition.getPartitionId(), 0, new 
NoOpBufferAvailablityListener());
     }
 
+    @Test
+    public void testCreateViewNotifierAfterRegisteredPartition() throws 
Exception {
+        final ResultPartitionManager partitionManager = new 
ResultPartitionManager();
+        final ResultPartition partition = createPartition();
+
+        partitionManager.registerResultPartition(partition);
+        PartitionRequestNotifier partitionRequestNotifier = new 
TestingPartitionRequestNotifier();
+        assertNotNull(partitionManager.createSubpartitionViewOrNotify(
+                partition.getPartitionId(), 0, new 
NoOpBufferAvailablityListener(), partitionRequestNotifier));
+    }
+
+    @Test
+    public void testCreateViewNotifierBeforeRegisteredPartition() throws 
Exception {
+        final ResultPartitionManager partitionManager = new 
ResultPartitionManager();
+        final ResultPartition partition = createPartition();
+
+        PartitionRequestNotifier partitionRequestNotifier = new 
TestingPartitionRequestNotifier();
+        assertNull(partitionManager.createSubpartitionViewOrNotify(
+                partition.getPartitionId(), 0, new 
NoOpBufferAvailablityListener(), partitionRequestNotifier));
+
+        partitionManager.registerResultPartition(partition);

Review Comment:
   after this you could actually assert:
   1. check that list of registered listeners/notifiers in the 
`ResultPartitionManager` is empty
   2. assert that `partitionRequestNotifier` has been notified with correct 
partition and subpartition id.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingResultPartitionProvider.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.runtime.io.network.netty.NettyPartitionRequestNotifier;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/** {@link ResultPartitionProvider} implementation for testing purposes. */
+public class TestingResultPartitionProvider implements ResultPartitionProvider 
{
+    private final Function<Tuple3<ResultPartitionID, Integer, 
BufferAvailabilityListener>, ResultSubpartitionView> 
createSubpartitionViewFunction;
+    private final Function<Tuple4<ResultPartitionID, Integer, 
BufferAvailabilityListener, PartitionRequestNotifier>, ResultSubpartitionView> 
createSubpartitionViewOrNotifyFunction;
+    private final Consumer<NettyPartitionRequestNotifier> 
releasePartitionRequestNotifierConsumer;

Review Comment:
   Instead of using generic `Function` or `Consumer`, could you create a named 
interfaces for those functions, and use those named interfaces here, in the 
arguments and in the builder? It makes looking for implementations in IDE much 
easier, as it would then narrow down the results to actual usages. Now it would 
find all implementations of Functions/Consumers in the codebase.



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -451,6 +451,14 @@ public class NettyShuffleEnvironmentOptions {
                     .withDescription(
                             "Maximum backoff in milliseconds for partition 
requests of input channels.");
 
+    /** The timeout for partition request notifier. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Long> NETWORK_REQUEST_NOTIFY_TIMEOUT =
+            key("taskmanager.network.partition-request-notify-timeout-ms")
+                    .longType()
+                    .defaultValue(10000L)
+                    .withDescription("Timeout in milliseconds for partition 
request notifier in result partition manager.");
+

Review Comment:
   How is this going to interplay with the pre-existing request back off? I 
have a feeling that something is missing here. 
   
   The current default behaviour is that the timeout is effectively `22.7` 
seconds, including all exp back off stages. We should make sure if possible 
that the new default behaviour should at least approximately keep this value. 
For example `20` seconds would be good enough IMO.
   
   However as fas as I understand this PR, the exp backoff is still in effect? 
If the partition is not found, downstream client will create a request, this 
request will timeout after `10s`. Client will back off for `100ms`, send 
another request that will timeout after another `10s`. Client will increase the 
back off to `200ms` and retry. Rinse and repeat until back off reaches `10s`, 
which in total means that the client will be retrying the request for something 
like 80 seconds?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java:
##########
@@ -51,6 +89,17 @@ public void registerResultPartition(ResultPartition 
partition) {
                 throw new IllegalStateException("Result partition already 
registered.");
             }
 
+            InputRequestNotifierManager notifiers = 
requestPartitionNotifiers.remove(partition.getPartitionId());
+            if (notifiers != null) {
+                for (PartitionRequestNotifier notifier : 
notifiers.getPartitionRequestNotifiers()) {
+                    try {
+                        notifier.notifyPartitionRequest(partition);
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);

Review Comment:
   you don't need to wrap with `IOException`. I think this method can easily 
throw `IOException`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java:
##########
@@ -29,4 +31,28 @@ ResultSubpartitionView createSubpartitionView(
             int index,
             BufferAvailabilityListener availabilityListener)
             throws IOException;
+
+    /**
+     * If the upstream task's partition has been registered, returns the 
result subpartition input
+     * view immediately, otherwise save the notifier and return null.
+     *
+     * @param partitionId the result partition id
+     * @param index the index
+     * @param availabilityListener the buffer availability listener
+     * @param notifier the partition request notifier
+     * @return the result subpartition view
+     * @throws IOException the thrown exception
+     */
+    ResultSubpartitionView createSubpartitionViewOrNotify(
+            ResultPartitionID partitionId,
+            int index,
+            BufferAvailabilityListener availabilityListener,
+            PartitionRequestNotifier notifier) throws IOException;

Review Comment:
   Why does't it return `ResultSubpartitionView`? I think it would be simpler 
if it returned `boolean` true/false, whether the request has been processed 
immediately. Or even return `void` , as this is not used in production code, 
and maybe unit test can make an assertion in another way?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to