himanshug closed pull request #6263: Add ability to specify list of task ports 
and port range
URL: https://github.com/apache/incubator-druid/pull/6263
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/content/configuration/index.md 
b/docs/content/configuration/index.md
index c87da2a06e9..0a1831fa2f2 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -1039,8 +1039,9 @@ Middle managers pass their configurations down to their 
child peons. The middle
 |`druid.indexer.runner.javaOpts`|*DEPRECATED* A string of -X Java options to 
pass to the peon's JVM. Quotable parameters or parameters with spaces are 
encouraged to use javaOptsArray|""|
 |`druid.indexer.runner.javaOptsArray`|A json array of strings to be passed in 
as options to the peon's jvm. This is additive to javaOpts and is recommended 
for properly handling arguments which contain quotes or spaces like 
`["-XX:OnOutOfMemoryError=kill -9 %p"]`|`[]`|
 |`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can 
be created in Zookeeper.|524288|
-|`druid.indexer.runner.startPort`|Starting port used for peon processes, 
should be greater than 1023.|8100|
-|`druid.indexer.runner.separateIngestionEndpoint`|*Deprecated.* Use separate 
server and consequently separate jetty thread pool for ingesting events. Not 
supported with TLS.|false|
+|`druid.indexer.runner.startPort`|Starting port used for peon processes, 
should be greater than 1023 and less than 65536.|8100|
+|`druid.indexer.runner.endPort`|Ending port used for peon processes, should be 
greater than or equal to `druid.indexer.runner.startPort` and less than 
65536.|65535|
+|`druid.indexer.runner.ports`|A json array of integers to specify ports that 
used for peon processes. If provided and non-empty, ports for peon processes 
will be chosen from these ports. And 
`druid.indexer.runner.startPort/druid.indexer.runner.endPort` will be 
completely ignored.|`[]`|
 |`druid.worker.ip`|The IP of the worker.|localhost|
 |`druid.worker.version`|Version identifier for the middle manager.|0|
 |`druid.worker.capacity`|Maximum number of tasks the middle manager can 
accept.|Number of available processors - 1|
@@ -1103,14 +1104,6 @@ Additional peon configs include:
 |`druid.indexer.task.restoreTasksOnRestart`|If true, middleManagers will 
attempt to stop tasks gracefully on shutdown and restore them on restart.|false|
 |`druid.indexer.server.maxChatRequests`|Maximum number of concurrent requests 
served by a task's chat handler. Set to 0 to disable limiting.|0|
 
-If the deprecated `druid.indexer.runner.separateIngestionEndpoint` property is 
set to true then following configurations
-are available for the ingestion server at peon:
-
-|Property|Description|Default|
-|--------|-----------|-------|
-|`druid.indexer.server.chathandler.http.numThreads`|*Deprecated.* Number of 
threads for HTTP requests.|Math.max(10, (Number of available processors * 17) / 
16 + 2) + 30|
-|`druid.indexer.server.chathandler.http.maxIdleTime`|*Deprecated.* The Jetty 
max idle time for a connection.|PT5M|
-
 If the peon is running in remote mode, there must be an overlord up and 
running. Peons in remote mode can set the following configurations:
 
 |Property|Description|Default|
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 05bf2a198c0..c393c41c673 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -128,7 +128,7 @@ public ForkingTaskRunner(
     this.taskLogPusher = taskLogPusher;
     this.jsonMapper = jsonMapper;
     this.node = node;
-    this.portFinder = new PortFinder(config.getStartPort());
+    this.portFinder = new PortFinder(config.getStartPort(), 
config.getEndPort(), config.getPorts());
     this.exec = MoreExecutors.listeningDecorator(
         Execs.multiThreaded(workerConfig.getCapacity(), 
"forking-task-runner-%d")
     );
@@ -232,16 +232,9 @@ public TaskStatus call()
                         final String childHost = node.getHost();
                         int childPort = -1;
                         int tlsChildPort = -1;
-                        int childChatHandlerPort = -1;
 
                         if (node.isEnablePlaintextPort()) {
-                          if (config.isSeparateIngestionEndpoint()) {
-                            Pair<Integer, Integer> portPair = 
portFinder.findTwoConsecutiveUnusedPorts();
-                            childPort = portPair.lhs;
-                            childChatHandlerPort = portPair.rhs;
-                          } else {
-                            childPort = portFinder.findUnusedPort();
-                          }
+                          childPort = portFinder.findUnusedPort();
                         }
 
                         if (node.isEnableTlsPort()) {
@@ -396,22 +389,6 @@ public TaskStatus call()
                                command.add("-XX:ThreadPriorityPolicy=42");
                                */
 
-                              if (config.isSeparateIngestionEndpoint()) {
-                                command.add(StringUtils.format(
-                                    
"-Ddruid.indexer.task.chathandler.service=%s",
-                                    "placeholder/serviceName"
-                                ));
-                                // Actual serviceName will be passed by the 
EventReceiverFirehose when it registers itself with ChatHandlerProvider
-                                // Thus, "placeholder/serviceName" will be 
ignored
-                                
command.add(StringUtils.format("-Ddruid.indexer.task.chathandler.host=%s", 
childHost));
-                                command.add(StringUtils.format(
-                                    "-Ddruid.indexer.task.chathandler.port=%d",
-                                    childChatHandlerPort
-                                ));
-                                // Note - TLS is not supported with separate 
ingestion config,
-                                // if set then peon task will fail to start
-                              }
-
                               command.add("org.apache.druid.cli.Main");
                               command.add("internal");
                               command.add("peon");
@@ -515,9 +492,6 @@ public TaskStatus call()
                             if (node.isEnableTlsPort()) {
                               portFinder.markPortUnused(tlsChildPort);
                             }
-                            if (childChatHandlerPort > 0) {
-                              portFinder.markPortUnused(childChatHandlerPort);
-                            }
 
                             try {
                               if (!stopping && taskDir.exists()) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/PortFinder.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/PortFinder.java
index 79b5b0cc5f9..8ec7d467ff8 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/PortFinder.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/PortFinder.java
@@ -19,27 +19,32 @@
 
 package org.apache.druid.indexing.overlord;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
-
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Pair;
 
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.net.SocketException;
+import java.util.List;
 import java.util.Set;
 
 public class PortFinder
 {
   private final Set<Integer> usedPorts = Sets.newHashSet();
   private final int startPort;
+  private final int endPort;
+  private final List<Integer> candidatePorts;
 
-  public PortFinder(int startPort)
+  public PortFinder(int startPort, int endPort, List<Integer> candidatePorts)
   {
     this.startPort = startPort;
+    this.endPort = endPort;
+    this.candidatePorts = candidatePorts;
   }
 
-  private static boolean canBind(int portNum)
+  @VisibleForTesting
+  boolean canBind(int portNum)
   {
     try {
       new ServerSocket(portNum).close();
@@ -55,39 +60,44 @@ private static boolean canBind(int portNum)
 
   public synchronized int findUnusedPort()
   {
-    int port = chooseNext(startPort);
-    while (!canBind(port)) {
-      port = chooseNext(port + 1);
+    if (candidatePorts != null && !candidatePorts.isEmpty()) {
+      int port = chooseFromCandidates();
+      usedPorts.add(port);
+      return port;
+    } else {
+      int port = chooseNext(startPort);
+      while (!canBind(port)) {
+        port = chooseNext(port + 1);
+      }
+      usedPorts.add(port);
+      return port;
     }
-    usedPorts.add(port);
-    return port;
   }
 
-  public synchronized Pair<Integer, Integer> findTwoConsecutiveUnusedPorts()
+  public synchronized void markPortUnused(int port)
   {
-    int firstPort = chooseNext(startPort);
-    while (!canBind(firstPort) || !canBind(firstPort + 1)) {
-      firstPort = chooseNext(firstPort + 1);
-    }
-    usedPorts.add(firstPort);
-    usedPorts.add(firstPort + 1);
-    return new Pair<>(firstPort, firstPort + 1);
+    usedPorts.remove(port);
   }
 
-  public synchronized void markPortUnused(int port)
+  private int chooseFromCandidates()
   {
-    usedPorts.remove(port);
+    for (int port : candidatePorts) {
+      if (!usedPorts.contains(port) && canBind(port)) {
+        return port;
+      }
+    }
+    throw new ISE("All ports are used...");
   }
 
   private int chooseNext(int start)
   {
-    // up to unsigned short max (65535)
-    for (int i = start; i <= 0xFFFF; i++) {
+    // up to endPort (which default value is 65535)
+    for (int i = start; i <= endPort; i++) {
       if (!usedPorts.contains(i)) {
         return i;
       }
     }
-    throw new ISE("All ports are Used..");
+    throw new ISE("All ports are used...");
   }
 }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
index a8355cfd2c7..2f0add97a85 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
@@ -62,6 +62,19 @@
   @Max(65535)
   private int startPort = 8100;
 
+  @JsonProperty
+  @Min(1024)
+  @Max(65535)
+  private int endPort = 65535;
+
+  /**
+   * Task ports your services are going to use. If non-empty, ports for one 
task will be chosen from these ports.
+   * Otherwise, using startPort and endPort to generate usable ports.
+   */
+  @JsonProperty
+  @NotNull
+  private List<Integer> ports = ImmutableList.of();
+
   @JsonProperty
   @NotNull
   List<String> allowedPrefixes = Lists.newArrayList(
@@ -74,14 +87,6 @@
       "hadoop"
   );
 
-  @JsonProperty
-  private boolean separateIngestionEndpoint = false;
-
-  public boolean isSeparateIngestionEndpoint()
-  {
-    return separateIngestionEndpoint;
-  }
-
   public String getJavaCommand()
   {
     return javaCommand;
@@ -107,6 +112,16 @@ public int getStartPort()
     return startPort;
   }
 
+  public int getEndPort()
+  {
+    return endPort;
+  }
+
+  public List<Integer> getPorts()
+  {
+    return ports;
+  }
+
   public List<String> getAllowedPrefixes()
   {
     return allowedPrefixes;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/PortFinderTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/PortFinderTest.java
index b2db791bcf4..f57093cc806 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/PortFinderTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/PortFinderTest.java
@@ -19,35 +19,55 @@
 
 package org.apache.druid.indexing.overlord;
 
+import com.google.common.collect.ImmutableList;
+import org.easymock.EasyMock;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+
 
 public class PortFinderTest
 {
-  private final PortFinder finder = new PortFinder(1200);
+  private final List<PortFinder> finders = new ArrayList<>();
+
+  @Before
+  public void setUp()
+  {
+    // use startPort and endPort to generate usable ports.
+    PortFinder finder1 = EasyMock.createMockBuilder(PortFinder.class)
+                                 .withConstructor(1200, 1201, 
ImmutableList.of())
+                                 .addMockedMethod("canBind")
+                                 .createMock();
+    // chose usable ports from candidates
+    PortFinder finder2 = EasyMock.createMockBuilder(PortFinder.class)
+                                 .withConstructor(1024, 1025, 
ImmutableList.of(1200, 1201))
+                                 .addMockedMethod("canBind")
+                                 .createMock();
+
+    finders.add(finder1);
+    finders.add(finder2);
+  }
 
   @Test
-  public void testUsedPort() throws IOException
+  public void testUsedPort()
   {
-    final int port1 = finder.findUnusedPort();
-    // verify that the port is free
-    ServerSocket socket1 = new ServerSocket(port1);
-    finder.markPortUnused(port1);
-    final int port2 = finder.findUnusedPort();
-    Assert.assertNotEquals("Used port is not reallocated", port1, port2);
-    // verify that port2 is free
-    ServerSocket socket2 = new ServerSocket(port2);
-
-    socket1.close();
-    // Now port1 should get recycled
-    Assert.assertEquals(port1, finder.findUnusedPort());
-
-    socket2.close();
-    finder.markPortUnused(port1);
-    finder.markPortUnused(port2);
+    for (PortFinder finder : finders) {
+      EasyMock.expect(finder.canBind(1200)).andReturn(true).andReturn(false);
+      EasyMock.expect(finder.canBind(1201)).andReturn(true);
+      EasyMock.replay(finder);
+
+      final int port1 = finder.findUnusedPort();
+      Assert.assertEquals(1200, port1);
+      finder.markPortUnused(port1);
+
+      final int port2 = finder.findUnusedPort();
+      Assert.assertEquals(1201, port2);
+      finder.markPortUnused(port2);
 
+      EasyMock.verify(finder);
+    }
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/config/ForkingTaskRunnerConfigTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/config/ForkingTaskRunnerConfigTest.java
index 1a7abc46c08..18845e770c0 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/config/ForkingTaskRunnerConfigTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/config/ForkingTaskRunnerConfigTest.java
@@ -145,6 +145,31 @@ public void testCrazyJavaOptArray() throws 
JsonProcessingException
     );
   }
 
+  @Test
+  public void testPorts() throws JsonProcessingException
+  {
+    final List<Integer> ports = ImmutableList.of(1024, 1025);
+    Assert.assertEquals(
+        ports,
+        buildFromProperties(
+            IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + 
".ports",
+            MAPPER.writeValueAsString(ports)
+        ).getPorts()
+    );
+  }
+
+  @Test(expected = ProvisionException.class)
+  public void testExceptionalPorts()
+  {
+    
buildFromProperties(IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX 
+ ".ports", "not an Integer");
+  }
+
+  @Test(expected = ProvisionException.class)
+  public void testExceptionalPorts2()
+  {
+    
buildFromProperties(IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX 
+ ".ports", "1024"); // not an array
+  }
+
   @Test(expected = ProvisionException.class)
   public void testExceptionalJavaOptArray()
   {
diff --git 
a/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java
 
b/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java
index 268587e82fc..2a1a0f5662f 100644
--- 
a/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java
+++ 
b/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java
@@ -26,13 +26,11 @@
 import com.google.inject.Provides;
 import com.google.inject.multibindings.Multibinder;
 import org.apache.druid.guice.Jerseys;
-import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.annotations.RemoteChatHandler;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
-import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerResource;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.initialization.ServerConfig;
@@ -47,10 +45,7 @@
  */
 public class ChatHandlerServerModule implements Module
 {
-  private static final Logger log = new Logger(ChatHandlerServerModule.class);
   private static final String MAX_CHAT_REQUESTS_PROPERTY = 
"druid.indexer.server.maxChatRequests";
-  private static final String CHAT_PORT_PROPERTY = 
"druid.indexer.task.chathandler.port";
-
   private final Properties properties;
 
   public ChatHandlerServerModule(Properties properties)
@@ -72,32 +67,12 @@ public void configure(Binder binder)
     Multibinder.newSetBinder(binder, 
ServletFilterHolder.class).addBinding().to(TaskIdResponseHeaderFilterHolder.class);
 
     /**
-     * If "druid.indexer.task.chathandler.port" property is set then we assume 
that a separate Jetty Server with its
-     * own {@link ServerConfig} is required for ingestion apart from the query 
server otherwise we bind
-     * {@link DruidNode} annotated with {@link RemoteChatHandler} to {@literal 
@}{@link Self} {@link DruidNode}
+     * We bind {@link DruidNode} annotated with {@link RemoteChatHandler} to 
{@literal @}{@link Self} {@link DruidNode}
      * so that same Jetty Server is used for querying as well as ingestion.
      */
-    if (properties.containsKey(CHAT_PORT_PROPERTY)) {
-      log.info("Spawning separate ingestion server at port [%s]", 
properties.getProperty(CHAT_PORT_PROPERTY));
-      JsonConfigProvider.bind(binder, "druid.indexer.task.chathandler", 
DruidNode.class, RemoteChatHandler.class);
-      JsonConfigProvider.bind(
-          binder,
-          "druid.indexer.server.chathandler.http",
-          ServerConfig.class,
-          RemoteChatHandler.class
-      );
-      JsonConfigProvider.bind(
-          binder,
-          "druid.indexer.server.chathandler.https",
-          TLSServerConfig.class,
-          RemoteChatHandler.class
-      );
-      LifecycleModule.register(binder, Server.class, RemoteChatHandler.class);
-    } else {
-      
binder.bind(DruidNode.class).annotatedWith(RemoteChatHandler.class).to(Key.get(DruidNode.class,
 Self.class));
-      
binder.bind(ServerConfig.class).annotatedWith(RemoteChatHandler.class).to(Key.get(ServerConfig.class));
-      
binder.bind(TLSServerConfig.class).annotatedWith(RemoteChatHandler.class).to(Key.get(TLSServerConfig.class));
-    }
+    
binder.bind(DruidNode.class).annotatedWith(RemoteChatHandler.class).to(Key.get(DruidNode.class,
 Self.class));
+    
binder.bind(ServerConfig.class).annotatedWith(RemoteChatHandler.class).to(Key.get(ServerConfig.class));
+    
binder.bind(TLSServerConfig.class).annotatedWith(RemoteChatHandler.class).to(Key.get(TLSServerConfig.class));
   }
 
   @Provides


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to