pnowojski commented on code in PR #19380:
URL: https://github.com/apache/flink/pull/19380#discussion_r977519676
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java:
##########
@@ -34,25 +35,26 @@ ResultSubpartitionView createSubpartitionView(
/**
* If the upstream task's partition has been registered, returns the
result subpartition input
- * view immediately, otherwise save the notifier and return null.
+ * view immediately, otherwise register the listener and return empty.
*
* @param partitionId the result partition id
* @param index the index
* @param availabilityListener the buffer availability listener
- * @param notifier the partition request notifier
+ * @param listener the partition request listener
* @return the result subpartition view
* @throws IOException the thrown exception
*/
- ResultSubpartitionView createSubpartitionViewOrNotify(
+ Optional<ResultSubpartitionView> createSubpartitionViewOrRegisterListener(
ResultPartitionID partitionId,
int index,
BufferAvailabilityListener availabilityListener,
- PartitionRequestNotifier notifier) throws IOException;
+ PartitionRequestListener listener)
Review Comment:
`listener` -> `partitionRequestListener`
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java:
##########
@@ -68,27 +65,83 @@ public void testCreateViewForRegisteredPartition() throws
Exception {
partition.getPartitionId(), 0, new
NoOpBufferAvailablityListener());
}
+ /**
+ * {@link ResultPartitionManager} creates subpartition view reader after
the given partition is
+ * registered.
+ */
@Test
- public void testCreateViewNotifierAfterRegisteredPartition() throws
Exception {
+ public void testCreateSubpartitionViewAfterRegisteredPartition() throws
Exception {
final ResultPartitionManager partitionManager = new
ResultPartitionManager();
final ResultPartition partition = createPartition();
+ assertTrue(partitionManager.getListenerManagers().isEmpty());
+
partitionManager.registerResultPartition(partition);
- PartitionRequestNotifier partitionRequestNotifier = new
TestingPartitionRequestNotifier();
- assertNotNull(partitionManager.createSubpartitionViewOrNotify(
- partition.getPartitionId(), 0, new
NoOpBufferAvailablityListener(), partitionRequestNotifier));
+ PartitionRequestListener partitionRequestListener =
+ TestingPartitionRequestListener.newBuilder().build();
+ assertTrue(
+ partitionManager
+ .createSubpartitionViewOrRegisterListener(
+ partition.getPartitionId(),
+ 0,
+ new NoOpBufferAvailablityListener(),
+ partitionRequestListener)
+ .isPresent());
+ assertTrue(partitionManager.getListenerManagers().isEmpty());
}
+ /**
+ * The {@link ResultPartitionManager} registers {@link
PartitionRequestListener} before specify
+ * {@link ResultPartition} is registered. When the {@link ResultPartition}
is registered, the
+ * {@link ResultPartitionManager} will find the listener and create
partition view reader. an
+ */
@Test
- public void testCreateViewNotifierBeforeRegisteredPartition() throws
Exception {
+ public void testRegisterPartitionListenerBeforeRegisteredPartition()
throws Exception {
final ResultPartitionManager partitionManager = new
ResultPartitionManager();
final ResultPartition partition = createPartition();
- PartitionRequestNotifier partitionRequestNotifier = new
TestingPartitionRequestNotifier();
- assertNull(partitionManager.createSubpartitionViewOrNotify(
- partition.getPartitionId(), 0, new
NoOpBufferAvailablityListener(), partitionRequestNotifier));
+ assertTrue(partitionManager.getListenerManagers().isEmpty());
+
+ final CompletableFuture<ResultPartition>
notifySubpartitionCreatedFuture =
+ new CompletableFuture<>();
+ PartitionRequestListener partitionRequestListener =
+ TestingPartitionRequestListener.newBuilder()
+ .setResultPartitionId(partition.getPartitionId())
+ .setNetworkSequenceViewReader(
+
TestingSubpartitionCreatedViewReader.newBuilder()
+ .setNotifySubpartitionCreatedConsumer(
+ tuple ->
+
notifySubpartitionCreatedFuture.complete(
+ tuple.f0))
+ .build())
+ .build();
+ assertFalse(
+ partitionManager
+ .createSubpartitionViewOrRegisterListener(
+ partition.getPartitionId(),
+ 0,
+ new NoOpBufferAvailablityListener(),
+ partitionRequestListener)
+ .isPresent());
+ assertEquals(partitionManager.getListenerManagers().size(), 1);
+
+ // Check if the partition request listener is registered.
+ PartitionRequestListenerManager listenerManager =
+
partitionManager.getListenerManagers().get(partition.getPartitionId());
+ assertNotNull(listenerManager);
+ assertFalse(listenerManager.isEmpty());
+ assertEquals(listenerManager.getPartitionRequestNotifiers().size(), 1);
+ PartitionRequestListener listener =
+
listenerManager.getPartitionRequestNotifiers().iterator().next();
+ assertEquals(listener.getResultPartitionId(),
partition.getPartitionId());
Review Comment:
add `assertFalse(notifySubpartitionCreatedFuture.isDone()` before
`registerResultPartition()` call?
##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -451,13 +451,14 @@ public class NettyShuffleEnvironmentOptions {
.withDescription(
"Maximum backoff in milliseconds for partition
requests of input channels.");
- /** The timeout for partition request notifier. */
+ /** The timeout for partition request listener in result partition
manager. */
@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
- public static final ConfigOption<Long> NETWORK_REQUEST_NOTIFY_TIMEOUT =
- key("taskmanager.network.partition-request-notify-timeout-ms")
- .longType()
- .defaultValue(10000L)
- .withDescription("Timeout in milliseconds for partition
request notifier in result partition manager.");
+ public static final ConfigOption<Integer> NETWORK_REQUEST_LISTENER_TIMEOUT
=
+ key("taskmanager.network.partition-request-listener-timeout-ms")
Review Comment:
rename to `taskmanager.network.partition-request-timeout`, and change the
type to `ConfigOption<Duration>`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java:
##########
@@ -276,21 +277,36 @@ protected int getCurrentBackoff() {
* @return <code>true</code>, iff the operation was successful. Otherwise,
<code>false</code>.
*/
protected boolean increaseBackoff() {
+ return increaseBackoff(0);
+ }
+
+ /**
+ * The remote task manager creates partition request listener and returns
{@link
+ * PartitionNotFoundException} until the listener is timeout, so the
backoff should add the
+ * timeout milliseconds if it exists.
+ *
+ * @param timeoutMS The timeout milliseconds that the partition request
listener timeout
+ * @return <code>true</code>, iff the operation was successful. Otherwise,
<code>false</code>.
+ */
+ protected boolean increaseBackoff(int timeoutMS) {
Review Comment:
I see that you kept the original logic of the backoff and just taken into
the account this timeout. Are you sure this is the right logic? What if the
request fails immediately, without waiting for this
`partition-request-notify-timeout-ms`, because upstream TM hasn't yet started?
Wouldn't it mean that the backoffs would not work at all, and the requests
would fail within a couple of ms after a couple of retries?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java:
##########
@@ -276,21 +277,36 @@ protected int getCurrentBackoff() {
* @return <code>true</code>, iff the operation was successful. Otherwise,
<code>false</code>.
*/
protected boolean increaseBackoff() {
+ return increaseBackoff(0);
+ }
+
+ /**
+ * The remote task manager creates partition request listener and returns
{@link
+ * PartitionNotFoundException} until the listener is timeout, so the
backoff should add the
+ * timeout milliseconds if it exists.
+ *
+ * @param timeoutMS The timeout milliseconds that the partition request
listener timeout
+ * @return <code>true</code>, iff the operation was successful. Otherwise,
<code>false</code>.
+ */
+ protected boolean increaseBackoff(int timeoutMS) {
// Backoff is disabled
if (currentBackoff < 0) {
return false;
}
// This is the first time backing off
if (currentBackoff == 0) {
- currentBackoff = initialBackoff;
+ currentBackoff = timeoutMS > 0 ? timeoutMS : initialBackoff;
return true;
}
// Continue backing off
else if (currentBackoff < maxBackoff) {
- currentBackoff = Math.min(currentBackoff * 2, maxBackoff);
+ currentBackoff =
+ timeoutMS > 0
+ ? Math.min(currentBackoff + timeoutMS, maxBackoff)
+ : Math.min(currentBackoff * 2, maxBackoff);
Review Comment:
Why do we have two different conditions here? Especially that with the
default value for `timeoutMs` to be 10s, and `maxBackoff` also by default being
10s, both conditions return `maxBackoff` anyway?
If so, maybe let's keep the original logic for the sake of simplicity?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]