pnowojski commented on code in PR #19380:
URL: https://github.com/apache/flink/pull/19380#discussion_r977519676


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java:
##########
@@ -34,25 +35,26 @@ ResultSubpartitionView createSubpartitionView(
 
     /**
      * If the upstream task's partition has been registered, returns the 
result subpartition input
-     * view immediately, otherwise save the notifier and return null.
+     * view immediately, otherwise register the listener and return empty.
      *
      * @param partitionId the result partition id
      * @param index the index
      * @param availabilityListener the buffer availability listener
-     * @param notifier the partition request notifier
+     * @param listener the partition request listener
      * @return the result subpartition view
      * @throws IOException the thrown exception
      */
-    ResultSubpartitionView createSubpartitionViewOrNotify(
+    Optional<ResultSubpartitionView> createSubpartitionViewOrRegisterListener(
             ResultPartitionID partitionId,
             int index,
             BufferAvailabilityListener availabilityListener,
-            PartitionRequestNotifier notifier) throws IOException;
+            PartitionRequestListener listener)

Review Comment:
   `listener` -> `partitionRequestListener`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java:
##########
@@ -68,27 +65,83 @@ public void testCreateViewForRegisteredPartition() throws 
Exception {
                 partition.getPartitionId(), 0, new 
NoOpBufferAvailablityListener());
     }
 
+    /**
+     * {@link ResultPartitionManager} creates subpartition view reader after 
the given partition is
+     * registered.
+     */
     @Test
-    public void testCreateViewNotifierAfterRegisteredPartition() throws 
Exception {
+    public void testCreateSubpartitionViewAfterRegisteredPartition() throws 
Exception {
         final ResultPartitionManager partitionManager = new 
ResultPartitionManager();
         final ResultPartition partition = createPartition();
 
+        assertTrue(partitionManager.getListenerManagers().isEmpty());
+
         partitionManager.registerResultPartition(partition);
-        PartitionRequestNotifier partitionRequestNotifier = new 
TestingPartitionRequestNotifier();
-        assertNotNull(partitionManager.createSubpartitionViewOrNotify(
-                partition.getPartitionId(), 0, new 
NoOpBufferAvailablityListener(), partitionRequestNotifier));
+        PartitionRequestListener partitionRequestListener =
+                TestingPartitionRequestListener.newBuilder().build();
+        assertTrue(
+                partitionManager
+                        .createSubpartitionViewOrRegisterListener(
+                                partition.getPartitionId(),
+                                0,
+                                new NoOpBufferAvailablityListener(),
+                                partitionRequestListener)
+                        .isPresent());
+        assertTrue(partitionManager.getListenerManagers().isEmpty());
     }
 
+    /**
+     * The {@link ResultPartitionManager} registers {@link 
PartitionRequestListener} before specify
+     * {@link ResultPartition} is registered. When the {@link ResultPartition} 
is registered, the
+     * {@link ResultPartitionManager} will find the listener and create 
partition view reader. an
+     */
     @Test
-    public void testCreateViewNotifierBeforeRegisteredPartition() throws 
Exception {
+    public void testRegisterPartitionListenerBeforeRegisteredPartition() 
throws Exception {
         final ResultPartitionManager partitionManager = new 
ResultPartitionManager();
         final ResultPartition partition = createPartition();
 
-        PartitionRequestNotifier partitionRequestNotifier = new 
TestingPartitionRequestNotifier();
-        assertNull(partitionManager.createSubpartitionViewOrNotify(
-                partition.getPartitionId(), 0, new 
NoOpBufferAvailablityListener(), partitionRequestNotifier));
+        assertTrue(partitionManager.getListenerManagers().isEmpty());
+
+        final CompletableFuture<ResultPartition> 
notifySubpartitionCreatedFuture =
+                new CompletableFuture<>();
+        PartitionRequestListener partitionRequestListener =
+                TestingPartitionRequestListener.newBuilder()
+                        .setResultPartitionId(partition.getPartitionId())
+                        .setNetworkSequenceViewReader(
+                                
TestingSubpartitionCreatedViewReader.newBuilder()
+                                        .setNotifySubpartitionCreatedConsumer(
+                                                tuple ->
+                                                        
notifySubpartitionCreatedFuture.complete(
+                                                                tuple.f0))
+                                        .build())
+                        .build();
+        assertFalse(
+                partitionManager
+                        .createSubpartitionViewOrRegisterListener(
+                                partition.getPartitionId(),
+                                0,
+                                new NoOpBufferAvailablityListener(),
+                                partitionRequestListener)
+                        .isPresent());
+        assertEquals(partitionManager.getListenerManagers().size(), 1);
+
+        // Check if the partition request listener is registered.
+        PartitionRequestListenerManager listenerManager =
+                
partitionManager.getListenerManagers().get(partition.getPartitionId());
+        assertNotNull(listenerManager);
+        assertFalse(listenerManager.isEmpty());
+        assertEquals(listenerManager.getPartitionRequestNotifiers().size(), 1);
+        PartitionRequestListener listener =
+                
listenerManager.getPartitionRequestNotifiers().iterator().next();
+        assertEquals(listener.getResultPartitionId(), 
partition.getPartitionId());

Review Comment:
   add `assertFalse(notifySubpartitionCreatedFuture.isDone()` before 
`registerResultPartition()` call?
   



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -451,13 +451,14 @@ public class NettyShuffleEnvironmentOptions {
                     .withDescription(
                             "Maximum backoff in milliseconds for partition 
requests of input channels.");
 
-    /** The timeout for partition request notifier. */
+    /** The timeout for partition request listener in result partition 
manager. */
     @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
-    public static final ConfigOption<Long> NETWORK_REQUEST_NOTIFY_TIMEOUT =
-            key("taskmanager.network.partition-request-notify-timeout-ms")
-                    .longType()
-                    .defaultValue(10000L)
-                    .withDescription("Timeout in milliseconds for partition 
request notifier in result partition manager.");
+    public static final ConfigOption<Integer> NETWORK_REQUEST_LISTENER_TIMEOUT 
=
+            key("taskmanager.network.partition-request-listener-timeout-ms")

Review Comment:
   rename to `taskmanager.network.partition-request-timeout`, and change the 
type to `ConfigOption<Duration>`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java:
##########
@@ -276,21 +277,36 @@ protected int getCurrentBackoff() {
      * @return <code>true</code>, iff the operation was successful. Otherwise, 
<code>false</code>.
      */
     protected boolean increaseBackoff() {
+        return increaseBackoff(0);
+    }
+
+    /**
+     * The remote task manager creates partition request listener and returns 
{@link
+     * PartitionNotFoundException} until the listener is timeout, so the 
backoff should add the
+     * timeout milliseconds if it exists.
+     *
+     * @param timeoutMS The timeout milliseconds that the partition request 
listener timeout
+     * @return <code>true</code>, iff the operation was successful. Otherwise, 
<code>false</code>.
+     */
+    protected boolean increaseBackoff(int timeoutMS) {

Review Comment:
   I see that you kept the original logic of the backoff and just taken into 
the account this timeout. Are you sure this is the right logic? What if the 
request fails immediately, without waiting for this 
`partition-request-notify-timeout-ms`, because upstream TM hasn't yet started? 
Wouldn't it mean that the backoffs would not work at all, and the requests 
would fail within a couple of ms after a couple of retries?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java:
##########
@@ -276,21 +277,36 @@ protected int getCurrentBackoff() {
      * @return <code>true</code>, iff the operation was successful. Otherwise, 
<code>false</code>.
      */
     protected boolean increaseBackoff() {
+        return increaseBackoff(0);
+    }
+
+    /**
+     * The remote task manager creates partition request listener and returns 
{@link
+     * PartitionNotFoundException} until the listener is timeout, so the 
backoff should add the
+     * timeout milliseconds if it exists.
+     *
+     * @param timeoutMS The timeout milliseconds that the partition request 
listener timeout
+     * @return <code>true</code>, iff the operation was successful. Otherwise, 
<code>false</code>.
+     */
+    protected boolean increaseBackoff(int timeoutMS) {
         // Backoff is disabled
         if (currentBackoff < 0) {
             return false;
         }
 
         // This is the first time backing off
         if (currentBackoff == 0) {
-            currentBackoff = initialBackoff;
+            currentBackoff = timeoutMS > 0 ? timeoutMS : initialBackoff;
 
             return true;
         }
 
         // Continue backing off
         else if (currentBackoff < maxBackoff) {
-            currentBackoff = Math.min(currentBackoff * 2, maxBackoff);
+            currentBackoff =
+                    timeoutMS > 0
+                            ? Math.min(currentBackoff + timeoutMS, maxBackoff)
+                            : Math.min(currentBackoff * 2, maxBackoff);

Review Comment:
   Why do we have two different conditions here? Especially that with the 
default value for `timeoutMs` to be 10s, and `maxBackoff` also by default being 
10s, both conditions return `maxBackoff` anyway? 
   
   If so, maybe let's keep the original logic for the sake of simplicity?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to