HuangZhenQiu commented on a change in pull request #11541:
URL: https://github.com/apache/flink/pull/11541#discussion_r442989870
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
##########
@@ -60,49 +71,41 @@ NettyPartitionRequestClient
createPartitionRequestClient(ConnectionID connection
while (client == null) {
entry = clients.get(connectionId);
- if (entry != null) {
- // Existing channel or connecting channel
- if (entry instanceof
NettyPartitionRequestClient) {
- client = (NettyPartitionRequestClient)
entry;
- }
- else {
- ConnectingChannel future =
(ConnectingChannel) entry;
- client = future.waitForChannel();
-
- clients.replace(connectionId, future,
client);
- }
- }
- else {
- // No channel yet. Create one, but watch out
for a race.
- // We create a "connecting future" and
atomically add it to the map.
- // Only the thread that really added it
establishes the channel.
- // The others need to wait on that original
establisher's future.
- ConnectingChannel connectingChannel = new
ConnectingChannel(connectionId, this);
- Object old = clients.putIfAbsent(connectionId,
connectingChannel);
-
- if (old == null) {
-
nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);
-
- client =
connectingChannel.waitForChannel();
+ synchronized (connectionId) {
Review comment:
Yes, you are right. Java doesn't allow to synchronize on a variable.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]