QiuMM closed pull request #6262: Add ability to specify list of task ports
URL: https://github.com/apache/incubator-druid/pull/6262
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/content/configuration/index.md 
b/docs/content/configuration/index.md
index 5f970c032ea..f959b89bd6b 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -1040,6 +1040,7 @@ Middle managers pass their configurations down to their 
child peons. The middle
 |`druid.indexer.runner.javaOptsArray`|A json array of strings to be passed in 
as options to the peon's jvm. This is additive to javaOpts and is recommended 
for properly handling arguments which contain quotes or spaces like 
`["-XX:OnOutOfMemoryError=kill -9 %p"]`|`[]`|
 |`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can 
be created in Zookeeper.|524288|
 |`druid.indexer.runner.startPort`|Starting port used for peon processes, 
should be greater than 1023.|8100|
+|`druid.indexer.runner.ports`|A json array of integers to specify ports that 
used for peon processes. This property is an alternative to 
`druid.indexer.runner.startPort`. If specified and non-empty, ports for one 
peon process will be chosed from these ports rather than using 
`druid.indexer.runner.startPort` to allocate ports.|`[]`|
 |`druid.indexer.runner.separateIngestionEndpoint`|*Deprecated.* Use separate 
server and consequently separate jetty thread pool for ingesting events. Not 
supported with TLS.|false|
 |`druid.worker.ip`|The IP of the worker.|localhost|
 |`druid.worker.version`|Version identifier for the middle manager.|0|
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
 
b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
index cf4fcdb7c11..ca577c6b82b 100644
--- 
a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
+++ 
b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
@@ -128,7 +128,7 @@ public ForkingTaskRunner(
     this.taskLogPusher = taskLogPusher;
     this.jsonMapper = jsonMapper;
     this.node = node;
-    this.portFinder = new PortFinder(config.getStartPort());
+    this.portFinder = new PortFinder(config.getStartPort(), config.getPorts());
     this.exec = MoreExecutors.listeningDecorator(
         Execs.multiThreaded(workerConfig.getCapacity(), 
"forking-task-runner-%d")
     );
@@ -236,7 +236,7 @@ public TaskStatus call()
 
                         if (node.isEnablePlaintextPort()) {
                           if (config.isSeparateIngestionEndpoint()) {
-                            Pair<Integer, Integer> portPair = 
portFinder.findTwoConsecutiveUnusedPorts();
+                            Pair<Integer, Integer> portPair = 
portFinder.findTwoUnusedPorts();
                             childPort = portPair.lhs;
                             childChatHandlerPort = portPair.rhs;
                           } else {
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java 
b/indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java
index 7a34ee49e8e..1eecf61da56 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java
@@ -27,16 +27,19 @@
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.net.SocketException;
+import java.util.List;
 import java.util.Set;
 
 public class PortFinder
 {
   private final Set<Integer> usedPorts = Sets.newHashSet();
   private final int startPort;
+  private final List<Integer> candidatePorts;
 
-  public PortFinder(int startPort)
+  public PortFinder(int startPort, List<Integer> candidatePorts)
   {
     this.startPort = startPort;
+    this.candidatePorts = candidatePorts;
   }
 
   private static boolean canBind(int portNum)
@@ -55,23 +58,37 @@ private static boolean canBind(int portNum)
 
   public synchronized int findUnusedPort()
   {
-    int port = chooseNext(startPort);
-    while (!canBind(port)) {
-      port = chooseNext(port + 1);
+    if (candidatePorts != null && !candidatePorts.isEmpty()) {
+      int port = chooseFromCandidates();
+      usedPorts.add(port);
+      return port;
+    } else {
+      int port = chooseNext(startPort);
+      while (!canBind(port)) {
+        port = chooseNext(port + 1);
+      }
+      usedPorts.add(port);
+      return port;
     }
-    usedPorts.add(port);
-    return port;
   }
 
-  public synchronized Pair<Integer, Integer> findTwoConsecutiveUnusedPorts()
+  public synchronized Pair<Integer, Integer> findTwoUnusedPorts()
   {
-    int firstPort = chooseNext(startPort);
-    while (!canBind(firstPort) || !canBind(firstPort + 1)) {
-      firstPort = chooseNext(firstPort + 1);
+    if (candidatePorts != null && !candidatePorts.isEmpty()) {
+      int firstPort = chooseFromCandidates();
+      int secondPort = chooseFromCandidates();
+      usedPorts.add(firstPort);
+      usedPorts.add(secondPort);
+      return new Pair<>(firstPort, secondPort);
+    } else {
+      int firstPort = chooseNext(startPort);
+      while (!canBind(firstPort) || !canBind(firstPort + 1)) {
+        firstPort = chooseNext(firstPort + 1);
+      }
+      usedPorts.add(firstPort);
+      usedPorts.add(firstPort + 1);
+      return new Pair<>(firstPort, firstPort + 1);
     }
-    usedPorts.add(firstPort);
-    usedPorts.add(firstPort + 1);
-    return new Pair<>(firstPort, firstPort + 1);
   }
 
   public synchronized void markPortUnused(int port)
@@ -79,6 +96,16 @@ public synchronized void markPortUnused(int port)
     usedPorts.remove(port);
   }
 
+  private int chooseFromCandidates()
+  {
+    for (int port : candidatePorts) {
+      if (!usedPorts.contains(port) && canBind(port)) {
+        return port;
+      }
+    }
+    throw new ISE("All ports are Used..");
+  }
+
   private int chooseNext(int start)
   {
     // up to unsigned short max (65535)
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
 
b/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
index 4c1bfd1391a..152145f6e5b 100644
--- 
a/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
+++ 
b/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
@@ -62,6 +62,14 @@
   @Max(65535)
   private int startPort = 8100;
 
+  /**
+   * Task ports your services are going to use. If non-empty, ports for one 
task will be chosed from these ports.
+   * Otherwise, using startPort to generate ports.
+   */
+  @JsonProperty
+  @NotNull
+  private List<Integer> ports = ImmutableList.of();
+
   @JsonProperty
   @NotNull
   List<String> allowedPrefixes = Lists.newArrayList(
@@ -107,6 +115,11 @@ public int getStartPort()
     return startPort;
   }
 
+  public List<Integer> getPorts()
+  {
+    return ports;
+  }
+
   public List<String> getAllowedPrefixes()
   {
     return allowedPrefixes;
diff --git 
a/indexing-service/src/test/java/io/druid/indexing/overlord/PortFinderTest.java 
b/indexing-service/src/test/java/io/druid/indexing/overlord/PortFinderTest.java
index b6215ea0b1b..6079982d004 100644
--- 
a/indexing-service/src/test/java/io/druid/indexing/overlord/PortFinderTest.java
+++ 
b/indexing-service/src/test/java/io/druid/indexing/overlord/PortFinderTest.java
@@ -19,7 +19,9 @@
 
 package io.druid.indexing.overlord;
 
+import com.google.common.collect.ImmutableList;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -27,27 +29,66 @@
 
 public class PortFinderTest
 {
-  private final PortFinder finder = new PortFinder(1200);
+  private PortFinder finderUseStartPort;
+  private PortFinder finderUseCandidatePorts;
+
+  @Before
+  public void setUp()
+  {
+    finderUseStartPort = new PortFinder(1200, ImmutableList.of());
+    // find two unused port for 'finderUseCandidatePorts'
+    for (int i = 1024; i <= 0xFFFF; ++i) {
+      try {
+        new ServerSocket(i).close();
+        new ServerSocket(i + 1).close();
+        finderUseCandidatePorts = new PortFinder(1200, ImmutableList.of(i, i + 
1));
+        break;
+      }
+      catch (Exception e) {
+        // do nothing
+      }
+    }
+  }
 
   @Test
-  public void testUsedPort() throws IOException
+  public void testUseStartPort() throws IOException
   {
-    final int port1 = finder.findUnusedPort();
+    final int port1 = finderUseStartPort.findUnusedPort();
     // verify that the port is free
     ServerSocket socket1 = new ServerSocket(port1);
-    finder.markPortUnused(port1);
-    final int port2 = finder.findUnusedPort();
+    finderUseStartPort.markPortUnused(port1);
+    final int port2 = finderUseStartPort.findUnusedPort();
     Assert.assertNotEquals("Used port is not reallocated", port1, port2);
     // verify that port2 is free
     ServerSocket socket2 = new ServerSocket(port2);
 
     socket1.close();
     // Now port1 should get recycled
-    Assert.assertEquals(port1, finder.findUnusedPort());
+    Assert.assertEquals(port1, finderUseStartPort.findUnusedPort());
 
     socket2.close();
-    finder.markPortUnused(port1);
-    finder.markPortUnused(port2);
+    finderUseStartPort.markPortUnused(port1);
+    finderUseStartPort.markPortUnused(port2);
+  }
+
+  @Test
+  public void testUseCandidatePorts() throws IOException
+  {
+    final int port1 = finderUseCandidatePorts.findUnusedPort();
+    // verify that the port is free
+    ServerSocket socket1 = new ServerSocket(port1);
+    finderUseCandidatePorts.markPortUnused(port1);
+    final int port2 = finderUseCandidatePorts.findUnusedPort();
+    Assert.assertNotEquals("Used port is not reallocated", port1, port2);
+    // verify that port2 is free
+    ServerSocket socket2 = new ServerSocket(port2);
 
+    socket1.close();
+    // Now port1 should get recycled
+    Assert.assertEquals(port1, finderUseCandidatePorts.findUnusedPort());
+
+    socket2.close();
+    finderUseCandidatePorts.markPortUnused(port1);
+    finderUseCandidatePorts.markPortUnused(port2);
   }
 }
diff --git 
a/indexing-service/src/test/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfigTest.java
 
b/indexing-service/src/test/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfigTest.java
index 58deb48889b..8f81e485a53 100644
--- 
a/indexing-service/src/test/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfigTest.java
+++ 
b/indexing-service/src/test/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfigTest.java
@@ -145,6 +145,31 @@ public void testCrazyJavaOptArray() throws 
JsonProcessingException
     );
   }
 
+  @Test
+  public void testPorts() throws JsonProcessingException
+  {
+    final List<Integer> ports = ImmutableList.of(1024, 1025);
+    Assert.assertEquals(
+        ports,
+        buildFromProperties(
+            IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + 
".ports",
+            MAPPER.writeValueAsString(ports)
+        ).getPorts()
+    );
+  }
+
+  @Test(expected = ProvisionException.class)
+  public void testExceptionalPorts()
+  {
+    
buildFromProperties(IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX 
+ ".ports", "not an Integer");
+  }
+
+  @Test(expected = ProvisionException.class)
+  public void testExceptionalPorts2()
+  {
+    
buildFromProperties(IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX 
+ ".ports", "1024"); // not an array
+  }
+
   @Test(expected = ProvisionException.class)
   public void testExceptionalJavaOptArray()
   {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to