pnowojski commented on code in PR #19380: URL: https://github.com/apache/flink/pull/19380#discussion_r953608744
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionRequestNotifier.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.network.NetworkSequenceViewReader; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; + +import java.io.IOException; + +/** + * When the netty server receives a downstream task's partition request event and finds its upstream task doesn't register its partition yet, + * the netty server will construct a {@link PartitionRequestNotifier} and notify the request when the task deploys itself and + * registers its partition to {@link ResultPartitionManager}. + */ +public interface PartitionRequestNotifier { + + /** + * The creation timestamp of this notifier, it's used to check whether the notifier is timeout. + * + * @return the creation timestamp + */ + long getCreateTimestamp(); + + /** + * Get the result partition id of the notifier. + * + * @return the result partition id + */ + ResultPartitionID getResultPartitionId(); + + /** + * Get the view reader of the notifier. + * + * @return the view reader + */ + NetworkSequenceViewReader getViewReader(); + + /** + * Get the input channel id of the notifier. + * + * @return the input channel id + */ + InputChannelID getReceiverId(); + + /** + * Notify the pending partition request when the given partition is registered. + * + * @param partition The registered partition. + */ + void notifyPartitionRequest(ResultPartition partition) throws IOException; + + /** + * When the partition request notifier is timeout, it needs to notify {@link NetworkSequenceViewReader} of the message. + */ + void notifyPartitionRequestTimeout(); Review Comment: `notifyPartitionCreated` and `notifyPartitionCreatedTimeout`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionRequestNotifier.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.network.NetworkSequenceViewReader; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; + +import java.io.IOException; + +/** + * When the netty server receives a downstream task's partition request event and finds its upstream task doesn't register its partition yet, + * the netty server will construct a {@link PartitionRequestNotifier} and notify the request when the task deploys itself and + * registers its partition to {@link ResultPartitionManager}. + */ +public interface PartitionRequestNotifier { Review Comment: Shouldn't this be called `PartitionCreationListener`? This is NOT notifying about a partition request. But partition request is creating/using this "listener", when partition hasn't been yet created, to listener for partition creation notification. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/InputRequestNotifierManager.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * Manages partition request notifier with input channel id. + */ +public class InputRequestNotifierManager { Review Comment: `PartitionCreationListenerManager`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkSequenceViewReader.java: ########## @@ -40,6 +45,30 @@ void requestSubpartitionView( int subPartitionIndex) throws IOException; + /** + * The {@link PartitionRequestNotifier} notify and process the downstream task's partition request. + * + * @param partition the result partition + * @param subPartitionIndex the sub partition index + * @throws IOException the thrown exception + */ + void requestSubpartitionView(ResultPartition partition, int subPartitionIndex) throws IOException; Review Comment: Can you remove the old version of this method? Having two `requestSubpartitionView` is confusing, especially that the old one seems to be only used in tests after your change. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkSequenceViewReader.java: ########## @@ -40,6 +45,30 @@ void requestSubpartitionView( int subPartitionIndex) throws IOException; + /** + * The {@link PartitionRequestNotifier} notify and process the downstream task's partition request. + * + * @param partition the result partition + * @param subPartitionIndex the sub partition index + * @throws IOException the thrown exception + */ + void requestSubpartitionView(ResultPartition partition, int subPartitionIndex) throws IOException; Review Comment: maybe rename this one to `notifySubpartitionCreated`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkSequenceViewReader.java: ########## @@ -40,6 +45,30 @@ void requestSubpartitionView( int subPartitionIndex) throws IOException; + /** + * The {@link PartitionRequestNotifier} notify and process the downstream task's partition request. + * + * @param partition the result partition + * @param subPartitionIndex the sub partition index + * @throws IOException the thrown exception + */ + void requestSubpartitionView(ResultPartition partition, int subPartitionIndex) throws IOException; + + /** + * When the netty server receives the downstream task's partition request and the upstream task has registered its partition, + * it will process the partition request immediately, otherwise it will create a {@link PartitionRequestNotifier} for given {@link ResultPartitionID} + * in {@link ResultPartitionManager} and notify the request when the upstream task registers its partition. + * + * @param partitionProvider the result partition provider + * @param resultPartitionId the result partition id + * @param subPartitionIndex the sub partition index + * @throws IOException the thrown exception + */ + void requestSubpartitionViewOrNotify( Review Comment: maybe rename this to something like `requestSubpartitionView`, or `requestSubpartitionViewOrRegisterNotifier`? I think the method above actually deserves to be called `notify` more, as that's the above method's main purpose. While here we are requesting subpartition or in other words, registering notifier (and only _maybe_ triggering the notify). ########## flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java: ########## @@ -480,6 +493,7 @@ public int hashCode() { result = 31 * result + networkBufferSize; result = 31 * result + partitionRequestInitialBackoff; result = 31 * result + partitionRequestMaxBackoff; + result = 31 * result + partitionRequestNotifyTimeout.hashCode(); Review Comment: you forgot to modify `equals` method as well. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java: ########## @@ -101,6 +105,55 @@ public void requestSubpartitionView( notifyDataAvailable(); } + @Override + public void requestSubpartitionView( + ResultPartition partition, + int subPartitionIndex) throws IOException { + synchronized (requestLock) { + if (subpartitionView == null) { + subpartitionView = partition.createSubpartitionView(subPartitionIndex, this); + } else { + throw new IllegalStateException("Subpartition already requested"); + } + } + + notifyDataAvailable(); + } + + @Override + public void requestSubpartitionViewOrNotify( + ResultPartitionProvider partitionProvider, + ResultPartitionID resultPartitionId, + int subPartitionIndex) throws IOException { + synchronized (requestLock) { + if (subpartitionView == null) { + if (partitionRequestNotifier == null) { + partitionRequestNotifier = new NettyPartitionRequestNotifier( + partitionProvider, + this, + subPartitionIndex, + resultPartitionId); + } else { + throw new IllegalStateException("Partition request notifier already created"); + } + // The partition provider will create subpartitionView if resultPartition is registered, + // otherwise it will return null and add a notifier of the given result partition id. + this.subpartitionView = partitionProvider.createSubpartitionViewOrNotify( + resultPartitionId, + subPartitionIndex, + this, + partitionRequestNotifier); + if (subpartitionView == null) { + return; + } Review Comment: no-op? ########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java: ########## @@ -55,6 +68,29 @@ public void testCreateViewForRegisteredPartition() throws Exception { partition.getPartitionId(), 0, new NoOpBufferAvailablityListener()); } + @Test + public void testCreateViewNotifierAfterRegisteredPartition() throws Exception { + final ResultPartitionManager partitionManager = new ResultPartitionManager(); + final ResultPartition partition = createPartition(); + + partitionManager.registerResultPartition(partition); + PartitionRequestNotifier partitionRequestNotifier = new TestingPartitionRequestNotifier(); + assertNotNull(partitionManager.createSubpartitionViewOrNotify( + partition.getPartitionId(), 0, new NoOpBufferAvailablityListener(), partitionRequestNotifier)); + } + + @Test + public void testCreateViewNotifierBeforeRegisteredPartition() throws Exception { + final ResultPartitionManager partitionManager = new ResultPartitionManager(); + final ResultPartition partition = createPartition(); + + PartitionRequestNotifier partitionRequestNotifier = new TestingPartitionRequestNotifier(); + assertNull(partitionManager.createSubpartitionViewOrNotify( + partition.getPartitionId(), 0, new NoOpBufferAvailablityListener(), partitionRequestNotifier)); + Review Comment: instead of `assertNull`, you should: 1. check that list of registered listeners/notifiers in the `ResultPartitionManager` is NOT empty 2. assert that `partitionRequestNotifier` has NOT been notified ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java: ########## @@ -79,8 +128,47 @@ public ResultSubpartitionView createSubpartitionView( return subpartitionView; } + @Override + public ResultSubpartitionView createSubpartitionViewOrNotify( Review Comment: rename to something like `registerPartitionRequestListener`,`registerPartitionRequestNofitifer`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkSequenceViewReader.java: ########## @@ -40,6 +45,30 @@ void requestSubpartitionView( int subPartitionIndex) throws IOException; + /** + * The {@link PartitionRequestNotifier} notify and process the downstream task's partition request. + * + * @param partition the result partition + * @param subPartitionIndex the sub partition index + * @throws IOException the thrown exception + */ Review Comment: Can you explain in the javadoc how this method and `requestSubpartitionViewOrNotify` should be used? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java: ########## @@ -51,6 +89,17 @@ public void registerResultPartition(ResultPartition partition) { throw new IllegalStateException("Result partition already registered."); } + InputRequestNotifierManager notifiers = requestPartitionNotifiers.remove(partition.getPartitionId()); + if (notifiers != null) { + for (PartitionRequestNotifier notifier : notifiers.getPartitionRequestNotifiers()) { + try { + notifier.notifyPartitionRequest(partition); Review Comment: rename to `notifier.notifyPartitionCreated(partition)`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java: ########## @@ -88,10 +88,8 @@ protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws new CreditBasedSequenceNumberingViewReader( request.receiverId, request.credit, outboundQueue); - reader.requestSubpartitionView( + reader.requestSubpartitionViewOrNotify( partitionProvider, request.partitionId, request.queueIndex); - - outboundQueue.notifyReaderCreated(reader); Review Comment: What has happened with this call? I don't see it being replaced by anything else? Are you updating `PartitionRequestQueue#allReaders` somewhere else? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java: ########## @@ -104,12 +192,40 @@ public void shutdown() { registeredPartitions.clear(); + requestPartitionNotifiers.clear(); Review Comment: I think that's a good point ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java: ########## @@ -101,6 +105,55 @@ public void requestSubpartitionView( notifyDataAvailable(); } + @Override + public void requestSubpartitionView( + ResultPartition partition, + int subPartitionIndex) throws IOException { + synchronized (requestLock) { + if (subpartitionView == null) { Review Comment: nit: you can replace this if check and many similar checks in other places, with a one liners like: `checkState(subpartitionView == null, "Subpartition already requested")` ########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java: ########## @@ -55,6 +68,29 @@ public void testCreateViewForRegisteredPartition() throws Exception { partition.getPartitionId(), 0, new NoOpBufferAvailablityListener()); } + @Test + public void testCreateViewNotifierAfterRegisteredPartition() throws Exception { + final ResultPartitionManager partitionManager = new ResultPartitionManager(); + final ResultPartition partition = createPartition(); + + partitionManager.registerResultPartition(partition); + PartitionRequestNotifier partitionRequestNotifier = new TestingPartitionRequestNotifier(); + assertNotNull(partitionManager.createSubpartitionViewOrNotify( + partition.getPartitionId(), 0, new NoOpBufferAvailablityListener(), partitionRequestNotifier)); Review Comment: instead of `assertNotNull`, you should: 1. check that list of registered listeners/notifiers in the `ResultPartitionManager` is empty 2. assert that `partitionRequestNotifier` has been notified with correct partition and subpartition id. ########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java: ########## @@ -55,6 +68,29 @@ public void testCreateViewForRegisteredPartition() throws Exception { partition.getPartitionId(), 0, new NoOpBufferAvailablityListener()); } + @Test + public void testCreateViewNotifierAfterRegisteredPartition() throws Exception { + final ResultPartitionManager partitionManager = new ResultPartitionManager(); + final ResultPartition partition = createPartition(); + + partitionManager.registerResultPartition(partition); + PartitionRequestNotifier partitionRequestNotifier = new TestingPartitionRequestNotifier(); + assertNotNull(partitionManager.createSubpartitionViewOrNotify( + partition.getPartitionId(), 0, new NoOpBufferAvailablityListener(), partitionRequestNotifier)); + } + + @Test + public void testCreateViewNotifierBeforeRegisteredPartition() throws Exception { + final ResultPartitionManager partitionManager = new ResultPartitionManager(); + final ResultPartition partition = createPartition(); + + PartitionRequestNotifier partitionRequestNotifier = new TestingPartitionRequestNotifier(); + assertNull(partitionManager.createSubpartitionViewOrNotify( + partition.getPartitionId(), 0, new NoOpBufferAvailablityListener(), partitionRequestNotifier)); + + partitionManager.registerResultPartition(partition); Review Comment: after this you could actually assert: 1. check that list of registered listeners/notifiers in the `ResultPartitionManager` is empty 2. assert that `partitionRequestNotifier` has been notified with correct partition and subpartition id. ########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingResultPartitionProvider.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.runtime.io.network.netty.NettyPartitionRequestNotifier; + +import java.io.IOException; +import java.util.function.Consumer; +import java.util.function.Function; + +/** {@link ResultPartitionProvider} implementation for testing purposes. */ +public class TestingResultPartitionProvider implements ResultPartitionProvider { + private final Function<Tuple3<ResultPartitionID, Integer, BufferAvailabilityListener>, ResultSubpartitionView> createSubpartitionViewFunction; + private final Function<Tuple4<ResultPartitionID, Integer, BufferAvailabilityListener, PartitionRequestNotifier>, ResultSubpartitionView> createSubpartitionViewOrNotifyFunction; + private final Consumer<NettyPartitionRequestNotifier> releasePartitionRequestNotifierConsumer; Review Comment: Instead of using generic `Function` or `Consumer`, could you create a named interfaces for those functions, and use those named interfaces here, in the arguments and in the builder? It makes looking for implementations in IDE much easier, as it would then narrow down the results to actual usages. Now it would find all implementations of Functions/Consumers in the codebase. ########## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ########## @@ -451,6 +451,14 @@ public class NettyShuffleEnvironmentOptions { .withDescription( "Maximum backoff in milliseconds for partition requests of input channels."); + /** The timeout for partition request notifier. */ + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) + public static final ConfigOption<Long> NETWORK_REQUEST_NOTIFY_TIMEOUT = + key("taskmanager.network.partition-request-notify-timeout-ms") + .longType() + .defaultValue(10000L) + .withDescription("Timeout in milliseconds for partition request notifier in result partition manager."); + Review Comment: How is this going to interplay with the pre-existing request back off? I have a feeling that something is missing here. The current default behaviour is that the timeout is effectively `22.7` seconds, including all exp back off stages. We should make sure if possible that the new default behaviour should at least approximately keep this value. For example `20` seconds would be good enough IMO. However as fas as I understand this PR, the exp backoff is still in effect? If the partition is not found, downstream client will create a request, this request will timeout after `10s`. Client will back off for `100ms`, send another request that will timeout after another `10s`. Client will increase the back off to `200ms` and retry. Rinse and repeat until back off reaches `10s`, which in total means that the client will be retrying the request for something like 80 seconds? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java: ########## @@ -51,6 +89,17 @@ public void registerResultPartition(ResultPartition partition) { throw new IllegalStateException("Result partition already registered."); } + InputRequestNotifierManager notifiers = requestPartitionNotifiers.remove(partition.getPartitionId()); + if (notifiers != null) { + for (PartitionRequestNotifier notifier : notifiers.getPartitionRequestNotifiers()) { + try { + notifier.notifyPartitionRequest(partition); + } catch (IOException e) { + throw new RuntimeException(e); Review Comment: you don't need to wrap with `IOException`. I think this method can easily throw `IOException`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java: ########## @@ -29,4 +31,28 @@ ResultSubpartitionView createSubpartitionView( int index, BufferAvailabilityListener availabilityListener) throws IOException; + + /** + * If the upstream task's partition has been registered, returns the result subpartition input + * view immediately, otherwise save the notifier and return null. + * + * @param partitionId the result partition id + * @param index the index + * @param availabilityListener the buffer availability listener + * @param notifier the partition request notifier + * @return the result subpartition view + * @throws IOException the thrown exception + */ + ResultSubpartitionView createSubpartitionViewOrNotify( + ResultPartitionID partitionId, + int index, + BufferAvailabilityListener availabilityListener, + PartitionRequestNotifier notifier) throws IOException; Review Comment: Why does't it return `ResultSubpartitionView`? I think it would be simpler if it returned `boolean` true/false, whether the request has been processed immediately. Or even return `void` , as this is not used in production code, and maybe unit test can make an assertion in another way? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
