This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7dbe4bb [FLINK-10580] Harden BootstrapTool#startActorSystem
7dbe4bb is described below
commit 7dbe4bb194f79cd996b46c36037b512e93b1ef07
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Oct 17 13:35:51 2018 +0200
[FLINK-10580] Harden BootstrapTool#startActorSystem
Instead of opening a socket to check whether a given port is free we simply
start an
ActorSystem with it an check whether it can bind to this port. This also
solves the problem
that a 0 port get resolved to a specific port which might get taken between
closing the
test socket and starting the ActorSystem.
---
.../runtime/clusterframework/BootstrapTools.java | 15 +-----
.../clusterframework/BootstrapToolsTest.java | 57 +++++++++++++++++++++-
2 files changed, 57 insertions(+), 15 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index d95034d..3a2d2c5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -55,7 +55,6 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.BindException;
-import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -157,19 +156,7 @@ public class BootstrapTools {
}
while (portsIterator.hasNext()) {
- // first, we check if the port is available by opening
a socket
- // if the actor system fails to start on the port, we
try further
- ServerSocket availableSocket =
NetUtils.createSocketFromPorts(portsIterator, ServerSocket::new);
-
- int port;
- if (availableSocket == null) {
- throw new BindException("Unable to allocate
further port in port range: " + portRangeDefinition);
- } else {
- port = availableSocket.getLocalPort();
- try {
- availableSocket.close();
- } catch (IOException ignored) {}
- }
+ final int port = portsIterator.next();
try {
return startActorSystem(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index d1f32cf..c01f1aa 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -21,10 +21,26 @@ package org.apache.flink.runtime.clusterframework;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.CheckedSupplier;
+import akka.actor.ActorSystem;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -33,7 +49,9 @@ import static org.junit.Assert.assertNull;
/**
* Tests for {@link BootstrapToolsTest}.
*/
-public class BootstrapToolsTest {
+public class BootstrapToolsTest extends TestLogger {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BootstrapToolsTest.class);
@Test
public void testSubstituteConfigKey() {
@@ -305,4 +323,41 @@ public class BootstrapToolsTest {
BootstrapTools.updateTmpDirectoriesInConfiguration(config,
null);
assertEquals(config.getString(CoreOptions.TMP_DIRS),
CoreOptions.TMP_DIRS.defaultValue());
}
+
+ /**
+ * Tests that we can concurrently create two {@link ActorSystem}
without port conflicts.
+ * This effectively tests that we don't open a socket to check for a
ports availability.
+ * See FLINK-10580 for more details.
+ */
+ @Test
+ public void testConcurrentActorSystemCreation() throws Exception {
+ final int concurrentCreations = 10;
+ final ExecutorService executorService =
Executors.newFixedThreadPool(concurrentCreations);
+ final CyclicBarrier cyclicBarrier = new
CyclicBarrier(concurrentCreations);
+
+ try {
+ final List<CompletableFuture<Void>> actorSystemFutures
= IntStream.range(0, concurrentCreations)
+ .mapToObj(
+ ignored ->
+ CompletableFuture.supplyAsync(
+
CheckedSupplier.unchecked(() -> {
+
cyclicBarrier.await();
+
+ return
BootstrapTools.startActorSystem(
+ new
Configuration(),
+
"localhost",
+ "0",
+ LOG);
+ })))
+ .map(
+ // terminate ActorSystems
+ actorSystemFuture ->
+
actorSystemFuture.thenCompose(AkkaUtils::terminateActorSystem)
+ ).collect(Collectors.toList());
+
+ FutureUtils.completeAll(actorSystemFutures).get();
+ } finally {
+ ExecutorUtils.gracefulShutdown(10000L,
TimeUnit.MILLISECONDS, executorService);
+ }
+ }
}