Author: sershe
Date: Tue Feb 24 00:12:44 2015
New Revision: 1661823
URL: http://svn.apache.org/r1661823
Log:
HIVE-9759 : LLAP: Update launcher, scheduler to work with Tez changes
(Siddharth Seth)
Modified:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/launcher/DaemonContainerLauncher.java
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java
Modified:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java?rev=1661823&r1=1661822&r2=1661823&view=diff
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
(original)
+++
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
Tue Feb 24 00:12:44 2015
@@ -212,7 +212,7 @@ public class ContainerRunnerImpl extends
request.getContainerIdString(),
request.getTokenIdentifier(), request.getAppAttemptNumber(),
workingDir, localDirs,
envMap, objectRegistry, pid,
- executionContext, credentials, memoryAvailable,
request.getUser());
+ executionContext, credentials, memoryAvailable,
request.getUser(), null);
ContainerExecutionResult result = tezChild.run();
LOG.info("ExecutionTime for Container: " +
request.getContainerIdString() + "=" +
sw.stop().elapsedMillis());
Modified:
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/launcher/DaemonContainerLauncher.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/launcher/DaemonContainerLauncher.java?rev=1661823&r1=1661822&r2=1661823&view=diff
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/launcher/DaemonContainerLauncher.java
(original)
+++
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/launcher/DaemonContainerLauncher.java
Tue Feb 24 00:12:44 2015
@@ -14,6 +14,7 @@
package org.apache.tez.dag.app.launcher;
+import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -103,9 +104,10 @@ public class DaemonContainerLauncher ext
switch (event.getType()) {
case CONTAINER_LAUNCH_REQUEST:
NMCommunicatorLaunchRequestEvent launchEvent =
(NMCommunicatorLaunchRequestEvent) event;
+ InetSocketAddress address =
tal.getTaskCommunicator(launchEvent.getTaskCommId()).getAddress();
ListenableFuture<Void> future = executor.submit(
new SubmitCallable(getProxy(launchEvent.getNodeId().getHost()),
launchEvent,
- tokenIdentifier, tal.getAddress().getHostName(),
tal.getAddress().getPort()));
+ tokenIdentifier, address.getHostName(), address.getPort()));
Futures.addCallback(future, new
SubmitCallback(launchEvent.getContainerId(),
launchEvent.getContainer().getNodeId().getHost()));
break;
Modified:
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java?rev=1661823&r1=1661822&r2=1661823&view=diff
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java
(original)
+++
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java
Tue Feb 24 00:12:44 2015
@@ -14,11 +14,12 @@
package org.apache.tez.dag.app.rm;
-import java.io.IOException;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@@ -31,25 +32,17 @@ import com.google.common.util.concurrent
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
-// TODO Registration with RM - so that the AM is considered dead and restarted
in the expiry interval - 10 minutes.
-
public class DaemonTaskSchedulerService extends TaskSchedulerService {
private static final Log LOG =
LogFactory.getLog(DaemonTaskSchedulerService.class);
@@ -58,6 +51,7 @@ public class DaemonTaskSchedulerService
private final TaskSchedulerAppCallback appClientDelegate;
private final AppContext appContext;
private final List<String> serviceHosts;
+ private final Set<String> serviceHostSet;
private final ContainerFactory containerFactory;
private final Random random = new Random();
@@ -68,8 +62,6 @@ public class DaemonTaskSchedulerService
private final ConcurrentMap<Object, ContainerId> runningTasks =
new ConcurrentHashMap<Object, ContainerId>();
- private final AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient;
-
// Per daemon
private final int memoryPerInstance;
private final int coresPerInstance;
@@ -81,6 +73,7 @@ public class DaemonTaskSchedulerService
public DaemonTaskSchedulerService(TaskSchedulerAppCallback appClient,
AppContext appContext,
String clientHostname, int clientPort,
String trackingUrl,
+ long customAppIdIdentifier,
Configuration conf) {
// Accepting configuration here to allow setting up fields as final
super(DaemonTaskSchedulerService.class.getName());
@@ -88,7 +81,8 @@ public class DaemonTaskSchedulerService
this.appClientDelegate = createAppCallbackDelegate(appClient);
this.appContext = appContext;
this.serviceHosts = new LinkedList<String>();
- this.containerFactory = new ContainerFactory(appContext);
+ this.serviceHostSet = new HashSet<>();
+ this.containerFactory = new ContainerFactory(appContext,
customAppIdIdentifier);
this.memoryPerInstance = conf
.getInt(LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT);
@@ -104,7 +98,6 @@ public class DaemonTaskSchedulerService
int memoryPerExecutor = (int) (memoryPerInstance / (float)
executorsPerInstance);
int coresPerExecutor = (int) (coresPerInstance / (float)
executorsPerInstance);
this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor,
coresPerExecutor);
- this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(5000, new
FakeAmRmCallbackHandler());
String[] hosts =
conf.getTrimmedStrings(LlapDaemonConfiguration.LLAP_DAEMON_AM_SERVICE_HOSTS);
if (hosts == null || hosts.length == 0) {
@@ -112,6 +105,7 @@ public class DaemonTaskSchedulerService
}
for (String host : hosts) {
serviceHosts.add(host);
+ serviceHostSet.add(host);
}
LOG.info("Running with configuration: " +
@@ -125,35 +119,15 @@ public class DaemonTaskSchedulerService
@Override
public void serviceInit(Configuration conf) {
- amRmClient.init(conf);
}
@Override
public void serviceStart() {
- amRmClient.start();
- RegisterApplicationMasterResponse response;
- try {
- amRmClient.registerApplicationMaster(clientHostname, clientPort,
trackingUrl);
- } catch (YarnException e) {
- throw new TezUncheckedException(e);
- } catch (IOException e) {
- throw new TezUncheckedException(e);
- }
}
@Override
public void serviceStop() {
if (!this.isStopped.getAndSet(true)) {
-
- try {
- TaskSchedulerAppCallback.AppFinalStatus status =
appClientDelegate.getFinalAppStatus();
- amRmClient.unregisterApplicationMaster(status.exitStatus,
status.exitMessage,
- status.postCompletionTrackingUrl);
- } catch (YarnException e) {
- throw new TezUncheckedException(e);
- } catch (IOException e) {
- throw new TezUncheckedException(e);
- }
appCallbackExecutor.shutdownNow();
}
}
@@ -257,8 +231,23 @@ public class DaemonTaskSchedulerService
if (requestedHosts != null && requestedHosts.length > 0) {
Arrays.sort(requestedHosts);
host = requestedHosts[0];
- LOG.info("Selected host: " + host + " from requested hosts: " +
Arrays.toString(requestedHosts));
- } else {
+ if (serviceHostSet.contains(host)) {
+ LOG.info("Selected host: " + host + " from requested hosts: " +
Arrays.toString(requestedHosts));
+ } else {
+ LOG.info("Preferred host: " + host + " not present. Attempting to
select another one");
+ host = null;
+ for (String h : requestedHosts) {
+ if (serviceHostSet.contains(h)) {
+ host = h;
+ break;
+ }
+ }
+ if (host == null) {
+ LOG.info("Requested hosts: " + Arrays.toString(requestedHosts) + "
not present. Randomizing the host");
+ }
+ }
+ }
+ if (host == null) {
host = serviceHosts.get(random.nextInt(serviceHosts.size()));
LOG.info("Selected random host: " + host + " since the request contained
no host information");
}
@@ -266,17 +255,19 @@ public class DaemonTaskSchedulerService
}
static class ContainerFactory {
- final AppContext appContext;
+ final ApplicationAttemptId customAppAttemptId;
AtomicInteger nextId;
- public ContainerFactory(AppContext appContext) {
- this.appContext = appContext;
+ public ContainerFactory(AppContext appContext, long appIdLong) {
this.nextId = new AtomicInteger(1);
+ ApplicationId appId = ApplicationId
+ .newInstance(appIdLong,
appContext.getApplicationAttemptId().getApplicationId().getId());
+ this.customAppAttemptId = ApplicationAttemptId
+ .newInstance(appId,
appContext.getApplicationAttemptId().getAttemptId());
}
public Container createContainer(Resource capability, Priority priority,
String hostname) {
- ApplicationAttemptId appAttemptId = appContext.getApplicationAttemptId();
- ContainerId containerId = ContainerId.newInstance(appAttemptId,
nextId.getAndIncrement());
+ ContainerId containerId = ContainerId.newInstance(customAppAttemptId,
nextId.getAndIncrement());
NodeId nodeId = NodeId.newInstance(hostname, 0);
String nodeHttpAddress = "hostname:0";
@@ -290,37 +281,4 @@ public class DaemonTaskSchedulerService
return container;
}
}
-
- private static class FakeAmRmCallbackHandler implements
AMRMClientAsync.CallbackHandler {
-
- @Override
- public void onContainersCompleted(List<ContainerStatus> statuses) {
-
- }
-
- @Override
- public void onContainersAllocated(List<Container> containers) {
-
- }
-
- @Override
- public void onShutdownRequest() {
-
- }
-
- @Override
- public void onNodesUpdated(List<NodeReport> updatedNodes) {
-
- }
-
- @Override
- public float getProgress() {
- return 0;
- }
-
- @Override
- public void onError(Throwable e) {
-
- }
- }
}