This is an automated email from the ASF dual-hosted git repository.
prasanthj pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new d9b3833 HIVE-20841: LLAP: Make dynamic ports configurable (Prasanth
Jayachandran reviewed by Sergey Shelukhin)
d9b3833 is described below
commit d9b3833e0c7b0ee008d1dd39bacc8f758170156f
Author: Prasanth Jayachandran <[email protected]>
AuthorDate: Tue Feb 12 00:28:09 2019 -0800
HIVE-20841: LLAP: Make dynamic ports configurable (Prasanth Jayachandran
reviewed by Sergey Shelukhin)
---
common/src/java/org/apache/hadoop/hive/conf/HiveConf.java | 4 ++++
.../hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java | 8 ++++++--
.../apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java | 6 +++++-
.../hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java | 2 +-
.../hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java | 7 +++++--
5 files changed, 21 insertions(+), 6 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a04ef38..3bb482f 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4010,6 +4010,8 @@ public class HiveConf extends Configuration {
LLAP_DAEMON_RPC_NUM_HANDLERS("hive.llap.daemon.rpc.num.handlers", 5,
"Number of RPC handlers for LLAP daemon.",
"llap.daemon.rpc.num.handlers"),
+ LLAP_PLUGIN_RPC_PORT("hive.llap.plugin.rpc.port", 0,
+ "Port to use for LLAP plugin rpc server"),
LLAP_PLUGIN_RPC_NUM_HANDLERS("hive.llap.plugin.rpc.num.handlers", 1,
"Number of RPC handlers for AM LLAP plugin endpoint."),
LLAP_DAEMON_WORK_DIRS("hive.llap.daemon.work.dirs", "",
@@ -4182,6 +4184,8 @@ public class HiveConf extends Configuration {
"Sleep duration (in milliseconds) to wait before retrying on error when
obtaining a\n" +
"connection to LLAP daemon from Tez AM.",
"llap.task.communicator.connection.sleep-between-retries-millis"),
+ LLAP_TASK_UMBILICAL_SERVER_PORT("hive.llap.daemon.umbilical.port", 0,
+ "LLAP task umbilical server RPC port"),
LLAP_DAEMON_WEB_PORT("hive.llap.daemon.web.port", 15002, "LLAP daemon web
UI port.",
"llap.daemon.service.port"),
LLAP_DAEMON_WEB_SSL("hive.llap.daemon.web.ssl", false,
diff --git
a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
index 89cb6fb..a16c0af 100644
---
a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
+++
b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hive.conf.HiveConf;
import
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
import org.apache.hadoop.ipc.RPC;
@@ -53,11 +54,14 @@ public class LlapTaskUmbilicalServer {
public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol
umbilical, int numHandlers) throws IOException {
jobTokenSecretManager = new JobTokenSecretManager();
-
+ int umbilicalPort = HiveConf.getIntVar(conf,
HiveConf.ConfVars.LLAP_TASK_UMBILICAL_SERVER_PORT);
+ if (umbilicalPort <= 0) {
+ umbilicalPort = 0;
+ }
server = new RPC.Builder(conf)
.setProtocol(LlapTaskUmbilicalProtocol.class)
.setBindAddress("0.0.0.0")
- .setPort(0)
+ .setPort(umbilicalPort)
.setInstance(umbilical)
.setNumHandlers(numHandlers)
.setSecretManager(jobTokenSecretManager).build();
diff --git
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 5d4ce22..2dfd359 100644
---
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -254,10 +254,14 @@ public class LlapTaskCommunicator extends
TezTaskCommunicatorImpl {
int numHandlers =
HiveConf.getIntVar(conf,
ConfVars.LLAP_TASK_COMMUNICATOR_LISTENER_THREAD_COUNT);
+ int umbilicalPort = HiveConf.getIntVar(conf,
ConfVars.LLAP_TASK_UMBILICAL_SERVER_PORT);
+ if (umbilicalPort <= 0) {
+ umbilicalPort = 0;
+ }
server = new RPC.Builder(conf)
.setProtocol(LlapTaskUmbilicalProtocol.class)
.setBindAddress("0.0.0.0")
- .setPort(0)
+ .setPort(umbilicalPort)
.setInstance(umbilical)
.setNumHandlers(numHandlers)
.setSecretManager(jobTokenSecretManager).build();
diff --git
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 8217964..0ea4c09 100644
---
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -317,7 +317,7 @@ public class LlapTaskSchedulerService extends TaskScheduler
{
serializedToken = jobIdForToken = null;
}
pluginEndpoint = new LlapPluginServerImpl(sm,
- HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_RPC_NUM_HANDLERS),
this);
+ HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_RPC_NUM_HANDLERS),
this, HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_RPC_PORT));
} else {
serializedToken = jobIdForToken = null;
pluginEndpoint = null;
diff --git
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java
index e9a011a..6e6785e 100644
---
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java
+++
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java
@@ -42,13 +42,16 @@ public class LlapPluginServerImpl extends AbstractService
implements LlapPluginP
private final int numHandlers;
private final LlapTaskSchedulerService parent;
private final AtomicReference<InetSocketAddress> bindAddress = new
AtomicReference<>();
+ private final int port;
public LlapPluginServerImpl(SecretManager<JobTokenIdentifier> secretManager,
- int numHandlers, LlapTaskSchedulerService parent) {
+ int numHandlers, LlapTaskSchedulerService parent, int port) {
super("LlapPluginServerImpl");
this.secretManager = secretManager;
this.numHandlers = numHandlers;
this.parent = parent;
+ this.port = port <= 0 ? 0 : port;
+ LOG.info("Llap plugin server using port: {} #handlers: {}", port,
numHandlers);
}
@Override
@@ -63,7 +66,7 @@ public class LlapPluginServerImpl extends AbstractService
implements LlapPluginP
final Configuration conf = getConfig();
final BlockingService daemonImpl =
LlapPluginProtocolProtos.LlapPluginProtocol.newReflectiveBlockingService(this);
- server = LlapUtil.startProtocolServer(0, numHandlers, bindAddress , conf,
daemonImpl,
+ server = LlapUtil.startProtocolServer(port, numHandlers, bindAddress ,
conf, daemonImpl,
LlapPluginProtocolPB.class, secretManager, new
LlapPluginPolicyProvider(),
ConfVars.LLAP_PLUGIN_ACL, ConfVars.LLAP_PLUGIN_ACL_DENY);
LOG.info("Starting the plugin endpoint on port " +
bindAddress.get().getPort());