This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dbe4bb  [FLINK-10580] Harden BootstrapTool#startActorSystem
7dbe4bb is described below

commit 7dbe4bb194f79cd996b46c36037b512e93b1ef07
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Oct 17 13:35:51 2018 +0200

    [FLINK-10580] Harden BootstrapTool#startActorSystem
    
    Instead of opening a socket to check whether a given port is free we simply 
start an
    ActorSystem with it an check whether it can bind to this port. This also 
solves the problem
    that a 0 port get resolved to a specific port which might get taken between 
closing the
    test socket and starting the ActorSystem.
---
 .../runtime/clusterframework/BootstrapTools.java   | 15 +-----
 .../clusterframework/BootstrapToolsTest.java       | 57 +++++++++++++++++++++-
 2 files changed, 57 insertions(+), 15 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index d95034d..3a2d2c5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -55,7 +55,6 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.BindException;
-import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -157,19 +156,7 @@ public class BootstrapTools {
                }
 
                while (portsIterator.hasNext()) {
-                       // first, we check if the port is available by opening 
a socket
-                       // if the actor system fails to start on the port, we 
try further
-                       ServerSocket availableSocket = 
NetUtils.createSocketFromPorts(portsIterator, ServerSocket::new);
-
-                       int port;
-                       if (availableSocket == null) {
-                               throw new BindException("Unable to allocate 
further port in port range: " + portRangeDefinition);
-                       } else {
-                               port = availableSocket.getLocalPort();
-                               try {
-                                       availableSocket.close();
-                               } catch (IOException ignored) {}
-                       }
+                       final int port = portsIterator.next();
 
                        try {
                                return startActorSystem(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index d1f32cf..c01f1aa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -21,10 +21,26 @@ package org.apache.flink.runtime.clusterframework;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.CheckedSupplier;
 
+import akka.actor.ActorSystem;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -33,7 +49,9 @@ import static org.junit.Assert.assertNull;
 /**
  * Tests for {@link BootstrapToolsTest}.
  */
-public class BootstrapToolsTest {
+public class BootstrapToolsTest extends TestLogger {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(BootstrapToolsTest.class);
 
        @Test
        public void testSubstituteConfigKey() {
@@ -305,4 +323,41 @@ public class BootstrapToolsTest {
                BootstrapTools.updateTmpDirectoriesInConfiguration(config, 
null);
                assertEquals(config.getString(CoreOptions.TMP_DIRS), 
CoreOptions.TMP_DIRS.defaultValue());
        }
+
+       /**
+        * Tests that we can concurrently create two {@link ActorSystem} 
without port conflicts.
+        * This effectively tests that we don't open a socket to check for a 
ports availability.
+        * See FLINK-10580 for more details.
+        */
+       @Test
+       public void testConcurrentActorSystemCreation() throws Exception {
+               final int concurrentCreations = 10;
+               final ExecutorService executorService = 
Executors.newFixedThreadPool(concurrentCreations);
+               final CyclicBarrier cyclicBarrier = new 
CyclicBarrier(concurrentCreations);
+
+               try {
+                       final List<CompletableFuture<Void>> actorSystemFutures 
= IntStream.range(0, concurrentCreations)
+                               .mapToObj(
+                                       ignored ->
+                                               CompletableFuture.supplyAsync(
+                                                       
CheckedSupplier.unchecked(() -> {
+                                                               
cyclicBarrier.await();
+
+                                                               return 
BootstrapTools.startActorSystem(
+                                                                       new 
Configuration(),
+                                                                       
"localhost",
+                                                                       "0",
+                                                                       LOG);
+                                                       })))
+                               .map(
+                                       // terminate ActorSystems
+                                       actorSystemFuture ->
+                                               
actorSystemFuture.thenCompose(AkkaUtils::terminateActorSystem)
+                               ).collect(Collectors.toList());
+
+                       FutureUtils.completeAll(actorSystemFutures).get();
+               } finally {
+                       ExecutorUtils.gracefulShutdown(10000L, 
TimeUnit.MILLISECONDS, executorService);
+               }
+       }
 }

Reply via email to