This is an automated email from the ASF dual-hosted git repository.

himanshug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 87ccee0  Add ability to specify list of task ports and port range 
(#6263)
87ccee0 is described below

commit 87ccee05f76e983874b41090f2b0a6fe1cf4b8dd
Author: QiuMM <[email protected]>
AuthorDate: Fri Sep 14 10:36:04 2018 +0800

    Add ability to specify list of task ports and port range (#6263)
    
    * support specify list of task ports
    
    * fix typos
    
    * address comments
    
    * remove druid.indexer.runner.separateIngestionEndpoint config
    
    * tweak doc
    
    * fix doc
    
    * code cleanup
    
    * keep some useful comments
---
 docs/content/configuration/index.md                | 13 ++---
 .../druid/indexing/overlord/ForkingTaskRunner.java | 30 +----------
 .../apache/druid/indexing/overlord/PortFinder.java | 54 +++++++++++--------
 .../overlord/config/ForkingTaskRunnerConfig.java   | 31 ++++++++---
 .../druid/indexing/overlord/PortFinderTest.java    | 60 ++++++++++++++--------
 .../config/ForkingTaskRunnerConfigTest.java        | 25 +++++++++
 .../jetty/ChatHandlerServerModule.java             | 33 ++----------
 7 files changed, 129 insertions(+), 117 deletions(-)

diff --git a/docs/content/configuration/index.md 
b/docs/content/configuration/index.md
index f430db7..771b872 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -1039,8 +1039,9 @@ Middle managers pass their configurations down to their 
child peons. The middle
 |`druid.indexer.runner.javaOpts`|*DEPRECATED* A string of -X Java options to 
pass to the peon's JVM. Quotable parameters or parameters with spaces are 
encouraged to use javaOptsArray|""|
 |`druid.indexer.runner.javaOptsArray`|A json array of strings to be passed in 
as options to the peon's jvm. This is additive to javaOpts and is recommended 
for properly handling arguments which contain quotes or spaces like 
`["-XX:OnOutOfMemoryError=kill -9 %p"]`|`[]`|
 |`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can 
be created in Zookeeper.|524288|
-|`druid.indexer.runner.startPort`|Starting port used for peon processes, 
should be greater than 1023.|8100|
-|`druid.indexer.runner.separateIngestionEndpoint`|*Deprecated.* Use separate 
server and consequently separate jetty thread pool for ingesting events. Not 
supported with TLS.|false|
+|`druid.indexer.runner.startPort`|Starting port used for peon processes, 
should be greater than 1023 and less than 65536.|8100|
+|`druid.indexer.runner.endPort`|Ending port used for peon processes, should be 
greater than or equal to `druid.indexer.runner.startPort` and less than 
65536.|65535|
+|`druid.indexer.runner.ports`|A json array of integers to specify ports that 
used for peon processes. If provided and non-empty, ports for peon processes 
will be chosen from these ports. And 
`druid.indexer.runner.startPort/druid.indexer.runner.endPort` will be 
completely ignored.|`[]`|
 |`druid.worker.ip`|The IP of the worker.|localhost|
 |`druid.worker.version`|Version identifier for the middle manager.|0|
 |`druid.worker.capacity`|Maximum number of tasks the middle manager can 
accept.|Number of available processors - 1|
@@ -1103,14 +1104,6 @@ Additional peon configs include:
 |`druid.indexer.task.restoreTasksOnRestart`|If true, middleManagers will 
attempt to stop tasks gracefully on shutdown and restore them on restart.|false|
 |`druid.indexer.server.maxChatRequests`|Maximum number of concurrent requests 
served by a task's chat handler. Set to 0 to disable limiting.|0|
 
-If the deprecated `druid.indexer.runner.separateIngestionEndpoint` property is 
set to true then following configurations
-are available for the ingestion server at peon:
-
-|Property|Description|Default|
-|--------|-----------|-------|
-|`druid.indexer.server.chathandler.http.numThreads`|*Deprecated.* Number of 
threads for HTTP requests.|Math.max(10, (Number of available processors * 17) / 
16 + 2) + 30|
-|`druid.indexer.server.chathandler.http.maxIdleTime`|*Deprecated.* The Jetty 
max idle time for a connection.|PT5M|
-
 If the peon is running in remote mode, there must be an overlord up and 
running. Peons in remote mode can set the following configurations:
 
 |Property|Description|Default|
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 05bf2a1..c393c41 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -128,7 +128,7 @@ public class ForkingTaskRunner implements TaskRunner, 
TaskLogStreamer
     this.taskLogPusher = taskLogPusher;
     this.jsonMapper = jsonMapper;
     this.node = node;
-    this.portFinder = new PortFinder(config.getStartPort());
+    this.portFinder = new PortFinder(config.getStartPort(), 
config.getEndPort(), config.getPorts());
     this.exec = MoreExecutors.listeningDecorator(
         Execs.multiThreaded(workerConfig.getCapacity(), 
"forking-task-runner-%d")
     );
@@ -232,16 +232,9 @@ public class ForkingTaskRunner implements TaskRunner, 
TaskLogStreamer
                         final String childHost = node.getHost();
                         int childPort = -1;
                         int tlsChildPort = -1;
-                        int childChatHandlerPort = -1;
 
                         if (node.isEnablePlaintextPort()) {
-                          if (config.isSeparateIngestionEndpoint()) {
-                            Pair<Integer, Integer> portPair = 
portFinder.findTwoConsecutiveUnusedPorts();
-                            childPort = portPair.lhs;
-                            childChatHandlerPort = portPair.rhs;
-                          } else {
-                            childPort = portFinder.findUnusedPort();
-                          }
+                          childPort = portFinder.findUnusedPort();
                         }
 
                         if (node.isEnableTlsPort()) {
@@ -396,22 +389,6 @@ public class ForkingTaskRunner implements TaskRunner, 
TaskLogStreamer
                                command.add("-XX:ThreadPriorityPolicy=42");
                                */
 
-                              if (config.isSeparateIngestionEndpoint()) {
-                                command.add(StringUtils.format(
-                                    
"-Ddruid.indexer.task.chathandler.service=%s",
-                                    "placeholder/serviceName"
-                                ));
-                                // Actual serviceName will be passed by the 
EventReceiverFirehose when it registers itself with ChatHandlerProvider
-                                // Thus, "placeholder/serviceName" will be 
ignored
-                                
command.add(StringUtils.format("-Ddruid.indexer.task.chathandler.host=%s", 
childHost));
-                                command.add(StringUtils.format(
-                                    "-Ddruid.indexer.task.chathandler.port=%d",
-                                    childChatHandlerPort
-                                ));
-                                // Note - TLS is not supported with separate 
ingestion config,
-                                // if set then peon task will fail to start
-                              }
-
                               command.add("org.apache.druid.cli.Main");
                               command.add("internal");
                               command.add("peon");
@@ -515,9 +492,6 @@ public class ForkingTaskRunner implements TaskRunner, 
TaskLogStreamer
                             if (node.isEnableTlsPort()) {
                               portFinder.markPortUnused(tlsChildPort);
                             }
-                            if (childChatHandlerPort > 0) {
-                              portFinder.markPortUnused(childChatHandlerPort);
-                            }
 
                             try {
                               if (!stopping && taskDir.exists()) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/PortFinder.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/PortFinder.java
index 79b5b0c..8ec7d46 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/PortFinder.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/PortFinder.java
@@ -19,27 +19,32 @@
 
 package org.apache.druid.indexing.overlord;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
-
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Pair;
 
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.net.SocketException;
+import java.util.List;
 import java.util.Set;
 
 public class PortFinder
 {
   private final Set<Integer> usedPorts = Sets.newHashSet();
   private final int startPort;
+  private final int endPort;
+  private final List<Integer> candidatePorts;
 
-  public PortFinder(int startPort)
+  public PortFinder(int startPort, int endPort, List<Integer> candidatePorts)
   {
     this.startPort = startPort;
+    this.endPort = endPort;
+    this.candidatePorts = candidatePorts;
   }
 
-  private static boolean canBind(int portNum)
+  @VisibleForTesting
+  boolean canBind(int portNum)
   {
     try {
       new ServerSocket(portNum).close();
@@ -55,39 +60,44 @@ public class PortFinder
 
   public synchronized int findUnusedPort()
   {
-    int port = chooseNext(startPort);
-    while (!canBind(port)) {
-      port = chooseNext(port + 1);
+    if (candidatePorts != null && !candidatePorts.isEmpty()) {
+      int port = chooseFromCandidates();
+      usedPorts.add(port);
+      return port;
+    } else {
+      int port = chooseNext(startPort);
+      while (!canBind(port)) {
+        port = chooseNext(port + 1);
+      }
+      usedPorts.add(port);
+      return port;
     }
-    usedPorts.add(port);
-    return port;
   }
 
-  public synchronized Pair<Integer, Integer> findTwoConsecutiveUnusedPorts()
+  public synchronized void markPortUnused(int port)
   {
-    int firstPort = chooseNext(startPort);
-    while (!canBind(firstPort) || !canBind(firstPort + 1)) {
-      firstPort = chooseNext(firstPort + 1);
-    }
-    usedPorts.add(firstPort);
-    usedPorts.add(firstPort + 1);
-    return new Pair<>(firstPort, firstPort + 1);
+    usedPorts.remove(port);
   }
 
-  public synchronized void markPortUnused(int port)
+  private int chooseFromCandidates()
   {
-    usedPorts.remove(port);
+    for (int port : candidatePorts) {
+      if (!usedPorts.contains(port) && canBind(port)) {
+        return port;
+      }
+    }
+    throw new ISE("All ports are used...");
   }
 
   private int chooseNext(int start)
   {
-    // up to unsigned short max (65535)
-    for (int i = start; i <= 0xFFFF; i++) {
+    // up to endPort (which default value is 65535)
+    for (int i = start; i <= endPort; i++) {
       if (!usedPorts.contains(i)) {
         return i;
       }
     }
-    throw new ISE("All ports are Used..");
+    throw new ISE("All ports are used...");
   }
 }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
index a8355cf..2f0add9 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
@@ -63,6 +63,19 @@ public class ForkingTaskRunnerConfig
   private int startPort = 8100;
 
   @JsonProperty
+  @Min(1024)
+  @Max(65535)
+  private int endPort = 65535;
+
+  /**
+   * Task ports your services are going to use. If non-empty, ports for one 
task will be chosen from these ports.
+   * Otherwise, using startPort and endPort to generate usable ports.
+   */
+  @JsonProperty
+  @NotNull
+  private List<Integer> ports = ImmutableList.of();
+
+  @JsonProperty
   @NotNull
   List<String> allowedPrefixes = Lists.newArrayList(
       "com.metamx",
@@ -74,14 +87,6 @@ public class ForkingTaskRunnerConfig
       "hadoop"
   );
 
-  @JsonProperty
-  private boolean separateIngestionEndpoint = false;
-
-  public boolean isSeparateIngestionEndpoint()
-  {
-    return separateIngestionEndpoint;
-  }
-
   public String getJavaCommand()
   {
     return javaCommand;
@@ -107,6 +112,16 @@ public class ForkingTaskRunnerConfig
     return startPort;
   }
 
+  public int getEndPort()
+  {
+    return endPort;
+  }
+
+  public List<Integer> getPorts()
+  {
+    return ports;
+  }
+
   public List<String> getAllowedPrefixes()
   {
     return allowedPrefixes;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/PortFinderTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/PortFinderTest.java
index b2db791..f57093c 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/PortFinderTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/PortFinderTest.java
@@ -19,35 +19,55 @@
 
 package org.apache.druid.indexing.overlord;
 
+import com.google.common.collect.ImmutableList;
+import org.easymock.EasyMock;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+
 
 public class PortFinderTest
 {
-  private final PortFinder finder = new PortFinder(1200);
+  private final List<PortFinder> finders = new ArrayList<>();
+
+  @Before
+  public void setUp()
+  {
+    // use startPort and endPort to generate usable ports.
+    PortFinder finder1 = EasyMock.createMockBuilder(PortFinder.class)
+                                 .withConstructor(1200, 1201, 
ImmutableList.of())
+                                 .addMockedMethod("canBind")
+                                 .createMock();
+    // chose usable ports from candidates
+    PortFinder finder2 = EasyMock.createMockBuilder(PortFinder.class)
+                                 .withConstructor(1024, 1025, 
ImmutableList.of(1200, 1201))
+                                 .addMockedMethod("canBind")
+                                 .createMock();
+
+    finders.add(finder1);
+    finders.add(finder2);
+  }
 
   @Test
-  public void testUsedPort() throws IOException
+  public void testUsedPort()
   {
-    final int port1 = finder.findUnusedPort();
-    // verify that the port is free
-    ServerSocket socket1 = new ServerSocket(port1);
-    finder.markPortUnused(port1);
-    final int port2 = finder.findUnusedPort();
-    Assert.assertNotEquals("Used port is not reallocated", port1, port2);
-    // verify that port2 is free
-    ServerSocket socket2 = new ServerSocket(port2);
-
-    socket1.close();
-    // Now port1 should get recycled
-    Assert.assertEquals(port1, finder.findUnusedPort());
-
-    socket2.close();
-    finder.markPortUnused(port1);
-    finder.markPortUnused(port2);
+    for (PortFinder finder : finders) {
+      EasyMock.expect(finder.canBind(1200)).andReturn(true).andReturn(false);
+      EasyMock.expect(finder.canBind(1201)).andReturn(true);
+      EasyMock.replay(finder);
+
+      final int port1 = finder.findUnusedPort();
+      Assert.assertEquals(1200, port1);
+      finder.markPortUnused(port1);
+
+      final int port2 = finder.findUnusedPort();
+      Assert.assertEquals(1201, port2);
+      finder.markPortUnused(port2);
 
+      EasyMock.verify(finder);
+    }
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/config/ForkingTaskRunnerConfigTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/config/ForkingTaskRunnerConfigTest.java
index 1a7abc4..18845e7 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/config/ForkingTaskRunnerConfigTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/config/ForkingTaskRunnerConfigTest.java
@@ -145,6 +145,31 @@ public class ForkingTaskRunnerConfigTest
     );
   }
 
+  @Test
+  public void testPorts() throws JsonProcessingException
+  {
+    final List<Integer> ports = ImmutableList.of(1024, 1025);
+    Assert.assertEquals(
+        ports,
+        buildFromProperties(
+            IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + 
".ports",
+            MAPPER.writeValueAsString(ports)
+        ).getPorts()
+    );
+  }
+
+  @Test(expected = ProvisionException.class)
+  public void testExceptionalPorts()
+  {
+    
buildFromProperties(IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX 
+ ".ports", "not an Integer");
+  }
+
+  @Test(expected = ProvisionException.class)
+  public void testExceptionalPorts2()
+  {
+    
buildFromProperties(IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX 
+ ".ports", "1024"); // not an array
+  }
+
   @Test(expected = ProvisionException.class)
   public void testExceptionalJavaOptArray()
   {
diff --git 
a/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java
 
b/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java
index 268587e..2a1a0f5 100644
--- 
a/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java
+++ 
b/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java
@@ -26,13 +26,11 @@ import com.google.inject.Module;
 import com.google.inject.Provides;
 import com.google.inject.multibindings.Multibinder;
 import org.apache.druid.guice.Jerseys;
-import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.annotations.RemoteChatHandler;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
-import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerResource;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.initialization.ServerConfig;
@@ -47,10 +45,7 @@ import java.util.Properties;
  */
 public class ChatHandlerServerModule implements Module
 {
-  private static final Logger log = new Logger(ChatHandlerServerModule.class);
   private static final String MAX_CHAT_REQUESTS_PROPERTY = 
"druid.indexer.server.maxChatRequests";
-  private static final String CHAT_PORT_PROPERTY = 
"druid.indexer.task.chathandler.port";
-
   private final Properties properties;
 
   public ChatHandlerServerModule(Properties properties)
@@ -72,32 +67,12 @@ public class ChatHandlerServerModule implements Module
     Multibinder.newSetBinder(binder, 
ServletFilterHolder.class).addBinding().to(TaskIdResponseHeaderFilterHolder.class);
 
     /**
-     * If "druid.indexer.task.chathandler.port" property is set then we assume 
that a separate Jetty Server with its
-     * own {@link ServerConfig} is required for ingestion apart from the query 
server otherwise we bind
-     * {@link DruidNode} annotated with {@link RemoteChatHandler} to {@literal 
@}{@link Self} {@link DruidNode}
+     * We bind {@link DruidNode} annotated with {@link RemoteChatHandler} to 
{@literal @}{@link Self} {@link DruidNode}
      * so that same Jetty Server is used for querying as well as ingestion.
      */
-    if (properties.containsKey(CHAT_PORT_PROPERTY)) {
-      log.info("Spawning separate ingestion server at port [%s]", 
properties.getProperty(CHAT_PORT_PROPERTY));
-      JsonConfigProvider.bind(binder, "druid.indexer.task.chathandler", 
DruidNode.class, RemoteChatHandler.class);
-      JsonConfigProvider.bind(
-          binder,
-          "druid.indexer.server.chathandler.http",
-          ServerConfig.class,
-          RemoteChatHandler.class
-      );
-      JsonConfigProvider.bind(
-          binder,
-          "druid.indexer.server.chathandler.https",
-          TLSServerConfig.class,
-          RemoteChatHandler.class
-      );
-      LifecycleModule.register(binder, Server.class, RemoteChatHandler.class);
-    } else {
-      
binder.bind(DruidNode.class).annotatedWith(RemoteChatHandler.class).to(Key.get(DruidNode.class,
 Self.class));
-      
binder.bind(ServerConfig.class).annotatedWith(RemoteChatHandler.class).to(Key.get(ServerConfig.class));
-      
binder.bind(TLSServerConfig.class).annotatedWith(RemoteChatHandler.class).to(Key.get(TLSServerConfig.class));
-    }
+    
binder.bind(DruidNode.class).annotatedWith(RemoteChatHandler.class).to(Key.get(DruidNode.class,
 Self.class));
+    
binder.bind(ServerConfig.class).annotatedWith(RemoteChatHandler.class).to(Key.get(ServerConfig.class));
+    
binder.bind(TLSServerConfig.class).annotatedWith(RemoteChatHandler.class).to(Key.get(TLSServerConfig.class));
   }
 
   @Provides


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to