YARN-3039. Implemented the app-level timeline aggregator discovery service. Contributed by Junping Du.
(cherry picked from commit 8a637914c13baae6749b481551901cfac94694f4) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dbfc0537 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dbfc0537 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dbfc0537 Branch: refs/heads/YARN-2928 Commit: dbfc053735884bf283e8578e5f5b42531e694543 Parents: 505e3f6 Author: Zhijie Shen <[email protected]> Authored: Tue Mar 17 20:23:49 2015 -0700 Committer: Vinod Kumar Vavilapalli <[email protected]> Committed: Fri Aug 14 11:23:22 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../api/protocolrecords/AllocateResponse.java | 33 ++ .../hadoop/yarn/conf/YarnConfiguration.java | 12 + .../src/main/proto/yarn_service_protos.proto | 1 + .../distributedshell/ApplicationMaster.java | 81 ++++- .../hadoop/yarn/client/api/AMRMClient.java | 18 + .../yarn/client/api/async/AMRMClientAsync.java | 17 + .../api/async/impl/AMRMClientAsyncImpl.java | 15 +- .../impl/pb/AllocateResponsePBImpl.java | 17 + .../hadoop/yarn/client/api/TimelineClient.java | 6 +- .../client/api/impl/TimelineClientImpl.java | 133 ++++++- .../hadoop/yarn/webapp/util/WebAppUtils.java | 2 +- .../src/main/resources/yarn-default.xml | 13 + .../hadoop/yarn/TestContainerLaunchRPC.java | 16 +- .../java/org/apache/hadoop/yarn/TestRPC.java | 247 ------------- .../hadoop/yarn/api/TestAllocateResponse.java | 17 + .../hadoop-yarn-server-common/pom.xml | 1 + .../api/AggregatorNodemanagerProtocol.java | 56 +++ .../api/AggregatorNodemanagerProtocolPB.java | 33 ++ ...gregatorNodemanagerProtocolPBClientImpl.java | 94 +++++ ...regatorNodemanagerProtocolPBServiceImpl.java | 61 ++++ .../protocolrecords/NodeHeartbeatRequest.java | 22 ++ .../protocolrecords/NodeHeartbeatResponse.java | 4 + .../ReportNewAggregatorsInfoRequest.java | 53 +++ .../ReportNewAggregatorsInfoResponse.java | 32 ++ .../impl/pb/NodeHeartbeatRequestPBImpl.java | 59 ++++ .../impl/pb/NodeHeartbeatResponsePBImpl.java | 47 +++ .../ReportNewAggregatorsInfoRequestPBImpl.java | 142 ++++++++ .../ReportNewAggregatorsInfoResponsePBImpl.java | 74 ++++ .../server/api/records/AppAggregatorsMap.java | 33 ++ .../impl/pb/AppAggregatorsMapPBImpl.java | 151 ++++++++ .../proto/aggregatornodemanager_protocol.proto | 29 ++ .../yarn_server_common_service_protos.proto | 21 ++ .../java/org/apache/hadoop/yarn/TestRPC.java | 345 +++++++++++++++++++ .../hadoop/yarn/TestYarnServerApiClasses.java | 17 + .../hadoop/yarn/server/nodemanager/Context.java | 13 + .../yarn/server/nodemanager/NodeManager.java | 45 ++- .../nodemanager/NodeStatusUpdaterImpl.java | 7 +- .../aggregatormanager/NMAggregatorService.java | 113 ++++++ .../application/ApplicationImpl.java | 4 + .../ApplicationMasterService.java | 6 + .../resourcemanager/ResourceTrackerService.java | 67 +++- .../server/resourcemanager/rmapp/RMApp.java | 17 + .../rmapp/RMAppAggregatorUpdateEvent.java | 36 ++ .../resourcemanager/rmapp/RMAppEventType.java | 3 + .../server/resourcemanager/rmapp/RMAppImpl.java | 51 ++- .../applicationsmanager/MockAsm.java | 12 + .../server/resourcemanager/rmapp/MockRMApp.java | 15 + .../PerNodeTimelineAggregatorsAuxService.java | 5 +- .../TimelineAggregatorsCollection.java | 78 ++++- .../TestTimelineAggregatorsCollection.java | 11 +- 51 files changed, 2098 insertions(+), 290 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 42b27b8..86e6d23 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -29,6 +29,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-3264. Created backing storage write interface and a POC only FS based storage implementation. (Vrushali C via zjshen) + YARN-3039. Implemented the app-level timeline aggregator discovery service. + (Junping Du via zjshen) + IMPROVEMENTS OPTIMIZATIONS http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index c4fdb79..421c2a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -120,6 +120,25 @@ public abstract class AllocateResponse { response.setAMRMToken(amRMToken); return response; } + + @Public + @Unstable + public static AllocateResponse newInstance(int responseId, + List<ContainerStatus> completedContainers, + List<Container> allocatedContainers, List<NodeReport> updatedNodes, + Resource availResources, AMCommand command, int numClusterNodes, + PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken, + List<ContainerResourceIncrease> increasedContainers, + List<ContainerResourceDecrease> decreasedContainers, + String aggregatorAddr) { + AllocateResponse response = + newInstance(responseId, completedContainers, allocatedContainers, + updatedNodes, availResources, command, numClusterNodes, preempt, + nmTokens, increasedContainers, decreasedContainers); + response.setAMRMToken(amRMToken); + response.setAggregatorAddr(aggregatorAddr); + return response; + } /** * If the <code>ResourceManager</code> needs the @@ -304,4 +323,18 @@ public abstract class AllocateResponse { @Private @Unstable public abstract void setAMRMToken(Token amRMToken); + + /** + * The address of aggregator that belong to this app + * + * @return The address of aggregator that belong to this attempt + */ + @Public + @Unstable + public abstract String getAggregatorAddr(); + + @Private + @Unstable + public abstract void setAggregatorAddr(String aggregatorAddr); + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 9572e3f..6101bfd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -671,6 +671,11 @@ public class YarnConfiguration extends Configuration { NM_PREFIX + "container-manager.thread-count"; public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 20; + /** Number of threads container manager uses.*/ + public static final String NM_AGGREGATOR_SERVICE_THREAD_COUNT = + NM_PREFIX + "aggregator-service.thread-count"; + public static final int DEFAULT_NM_AGGREGATOR_SERVICE_THREAD_COUNT = 5; + /** Number of threads used in cleanup.*/ public static final String NM_DELETE_THREAD_COUNT = NM_PREFIX + "delete.thread-count"; @@ -698,6 +703,13 @@ public class YarnConfiguration extends Configuration { public static final String DEFAULT_NM_LOCALIZER_ADDRESS = "0.0.0.0:" + DEFAULT_NM_LOCALIZER_PORT; + /** Address where the aggregator service IPC is.*/ + public static final String NM_AGGREGATOR_SERVICE_ADDRESS = + NM_PREFIX + "aggregator-service.address"; + public static final int DEFAULT_NM_AGGREGATOR_SERVICE_PORT = 8048; + public static final String DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS = + "0.0.0.0:" + DEFAULT_NM_LOCALIZER_PORT; + /** Interval in between cache cleanups.*/ public static final String NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS = NM_PREFIX + "localizer.cache.cleanup.interval-ms"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 098785a..eddf839 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -87,6 +87,7 @@ message AllocateResponseProto { repeated ContainerResourceIncreaseProto increased_containers = 10; repeated ContainerResourceDecreaseProto decreased_containers = 11; optional hadoop.common.TokenProto am_rm_token = 12; + optional string aggregator_addr = 13; } enum SchedulerResourceTypes { http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index fcf1556..b728502 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -40,6 +40,9 @@ import java.util.Vector; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; @@ -102,6 +105,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.LogManager; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * An ApplicationMaster for executing shell commands on a set of launched @@ -214,6 +218,13 @@ public class ApplicationMaster { private String appMasterTrackingUrl = ""; private boolean newTimelineService = false; + + // For posting entities in new timeline service in a non-blocking way + // TODO replace with event loop in TimelineClient. + private static ExecutorService threadPool = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") + .build()); // App Master configuration // No. of containers to run shell command on @@ -303,6 +314,19 @@ public class ApplicationMaster { } appMaster.run(); result = appMaster.finish(); + + threadPool.shutdown(); + + while (!threadPool.isTerminated()) { // wait for all posting thread to finish + try { + if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) { + threadPool.shutdownNow(); // send interrupt to hurry them along + } + } catch (InterruptedException e) { + LOG.warn("Timeline client service stop interrupted!"); + break; + } + } } catch (Throwable t) { LOG.fatal("Error running ApplicationMaster", t); LogManager.shutdown(); @@ -586,13 +610,15 @@ public class ApplicationMaster { amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); amRMClient.init(conf); amRMClient.start(); - + containerListener = createNMCallbackHandler(); nmClientAsync = new NMClientAsyncImpl(containerListener); nmClientAsync.init(conf); nmClientAsync.start(); startTimelineClient(conf); + // need to bind timelineClient + amRMClient.registerTimelineClient(timelineClient); if(timelineClient != null) { if (newTimelineService) { publishApplicationAttemptEventOnNewTimelineService(timelineClient, @@ -674,7 +700,12 @@ public class ApplicationMaster { if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { // Creating the Timeline Client - timelineClient = TimelineClient.createTimelineClient(); + if (newTimelineService) { + timelineClient = TimelineClient.createTimelineClient( + appAttemptID.getApplicationId()); + } else { + timelineClient = TimelineClient.createTimelineClient(); + } timelineClient.init(conf); timelineClient.start(); } else { @@ -764,7 +795,7 @@ public class ApplicationMaster { if(timelineClient != null) { timelineClient.stop(); } - + return success; } @@ -1260,6 +1291,18 @@ public class ApplicationMaster { } private static void publishContainerStartEventOnNewTimelineService( + final TimelineClient timelineClient, final Container container, + final String domainId, final UserGroupInformation ugi) { + Runnable publishWrapper = new Runnable() { + public void run() { + publishContainerStartEventOnNewTimelineServiceBase(timelineClient, + container, domainId, ugi); + } + }; + threadPool.execute(publishWrapper); + } + + private static void publishContainerStartEventOnNewTimelineServiceBase( final TimelineClient timelineClient, Container container, String domainId, UserGroupInformation ugi) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = @@ -1291,10 +1334,22 @@ public class ApplicationMaster { e instanceof UndeclaredThrowableException ? e.getCause() : e); } } - + private static void publishContainerEndEventOnNewTimelineService( - final TimelineClient timelineClient, ContainerStatus container, - String domainId, UserGroupInformation ugi) { + final TimelineClient timelineClient, final ContainerStatus container, + final String domainId, final UserGroupInformation ugi) { + Runnable publishWrapper = new Runnable() { + public void run() { + publishContainerEndEventOnNewTimelineServiceBase(timelineClient, + container, domainId, ugi); + } + }; + threadPool.execute(publishWrapper); + } + + private static void publishContainerEndEventOnNewTimelineServiceBase( + final TimelineClient timelineClient, final ContainerStatus container, + final String domainId, final UserGroupInformation ugi) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); entity.setId(container.getContainerId().toString()); @@ -1325,6 +1380,20 @@ public class ApplicationMaster { } private static void publishApplicationAttemptEventOnNewTimelineService( + final TimelineClient timelineClient, final String appAttemptId, + final DSEvent appEvent, final String domainId, + final UserGroupInformation ugi) { + + Runnable publishWrapper = new Runnable() { + public void run() { + publishApplicationAttemptEventOnNewTimelineServiceBase(timelineClient, + appAttemptId, appEvent, domainId, ugi); + } + }; + threadPool.execute(publishWrapper); + } + + private static void publishApplicationAttemptEventOnNewTimelineServiceBase( final TimelineClient timelineClient, String appAttemptId, DSEvent appEvent, String domainId, UserGroupInformation ugi) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index bfe10d6..56f2b10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -47,6 +47,8 @@ import com.google.common.collect.ImmutableList; public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends AbstractService { private static final Log LOG = LogFactory.getLog(AMRMClient.class); + + private TimelineClient timelineClient; /** * Create a new instance of AMRMClient. @@ -374,6 +376,22 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends } /** + * Register TimelineClient to AMRMClient. + * @param timelineClient + */ + public void registerTimelineClient(TimelineClient timelineClient) { + this.timelineClient = timelineClient; + } + + /** + * Get registered timeline client. + * @return + */ + public TimelineClient getRegisteredTimeineClient() { + return this.timelineClient; + } + + /** * Wait for <code>check</code> to return true for each 1000 ms. * See also {@link #waitFor(com.google.common.base.Supplier, int)} * and {@link #waitFor(com.google.common.base.Supplier, int, int)} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index f62e71b..be5610e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.exceptions.YarnException; import com.google.common.annotations.VisibleForTesting; @@ -193,6 +194,22 @@ extends AbstractService { * @return Current number of nodes in the cluster */ public abstract int getClusterNodeCount(); + + /** + * Register TimelineClient to AMRMClient. + * @param timelineClient + */ + public void registerTimelineClient(TimelineClient timelineClient) { + client.registerTimelineClient(timelineClient); + } + + /** + * Get registered timeline client. + * @return + */ + public TimelineClient getRegisteredTimeineClient() { + return client.getRegisteredTimeineClient(); + } /** * Update application's blacklist with addition or removal resources. http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index addc3b6..f0f0bc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -65,6 +66,8 @@ extends AMRMClientAsync<T> { private volatile boolean keepRunning; private volatile float progress; + private volatile String aggregatorAddr; + private volatile Throwable savedException; public AMRMClientAsyncImpl(int intervalMs, CallbackHandler callbackHandler) { @@ -304,7 +307,17 @@ extends AMRMClientAsync<T> { if (!allocated.isEmpty()) { handler.onContainersAllocated(allocated); } - + + String aggregatorAddress = response.getAggregatorAddr(); + TimelineClient timelineClient = client.getRegisteredTimeineClient(); + if (timelineClient != null && aggregatorAddress != null + && !aggregatorAddress.isEmpty()) { + if (aggregatorAddr == null || + !aggregatorAddr.equals(aggregatorAddress)) { + aggregatorAddr = aggregatorAddress; + timelineClient.setTimelineServiceAddress(aggregatorAddress); + } + } progress = handler.getProgress(); } catch (Throwable ex) { handler.onError(ex); http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index f2796fd..605c29a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -384,6 +384,23 @@ public class AllocateResponsePBImpl extends AllocateResponse { } this.amrmToken = amRMToken; } + + + @Override + public String getAggregatorAddr() { + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + return p.getAggregatorAddr(); + } + + @Override + public void setAggregatorAddr(String aggregatorAddr) { + maybeInitBuilder(); + if (aggregatorAddr == null) { + builder.clearAggregatorAddr(); + return; + } + builder.setAggregatorAddr(aggregatorAddr); + } private synchronized void initLocalIncreasedContainerList() { if (this.increasedContainers != null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index f7f6fc0..27ef3e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -52,7 +52,6 @@ public abstract class TimelineClient extends AbstractService { * @return a timeline client */ protected ApplicationId contextAppId; - protected String timelineServiceAddress; @Public public static TimelineClient createTimelineClient() { @@ -195,7 +194,6 @@ public abstract class TimelineClient extends AbstractService { * @param address * the timeline service address */ - public void setTimelineServiceAddress(String address) { - timelineServiceAddress = address; - } + public abstract void setTimelineServiceAddress(String address); + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 5bdc3b2..8992422 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -116,6 +116,15 @@ public class TimelineClientImpl extends TimelineClient { private URI resURI; private UserGroupInformation authUgi; private String doAsUser; + + private volatile String timelineServiceAddress; + + // Retry parameters for identifying new timeline service + // TODO consider to merge with connection retry + private int maxServiceRetries; + private long serviceRetryInterval; + + private boolean newTimelineService = false; @Private @VisibleForTesting @@ -261,6 +270,7 @@ public class TimelineClientImpl extends TimelineClient { public TimelineClientImpl(ApplicationId applicationId) { super(TimelineClientImpl.class.getName(), applicationId); + this.newTimelineService = true; } protected void serviceInit(Configuration conf) throws Exception { @@ -288,18 +298,32 @@ public class TimelineClientImpl extends TimelineClient { client = new Client(new URLConnectionClientHandler( new TimelineURLConnectionFactory()), cc); TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter(); - client.addFilter(retryFilter); + // TODO need to cleanup filter retry later. + if (!newTimelineService) { + client.addFilter(retryFilter); + } - if (YarnConfiguration.useHttps(conf)) { - timelineServiceAddress = conf.get( - YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS); + // old version timeline service need to get address from configuration + // while new version need to auto discovery (with retry). + if (newTimelineService) { + maxServiceRetries = conf.getInt( + YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES); + serviceRetryInterval = conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); } else { - timelineServiceAddress = conf.get( - YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS); - } - LOG.info("Timeline service address: " + timelineServiceAddress); + if (YarnConfiguration.useHttps(conf)) { + setTimelineServiceAddress(conf.get( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS)); + } else { + setTimelineServiceAddress(conf.get( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS)); + } + LOG.info("Timeline service address: " + getTimelineServiceAddress()); + } super.serviceInit(conf); } @@ -342,8 +366,7 @@ public class TimelineClientImpl extends TimelineClient { if (async) { params.add("async", Boolean.TRUE.toString()); } - putObjects(constructResURI(getConfig(), timelineServiceAddress, true), - "entities", params, entitiesContainer); + putObjects("entities", params, entitiesContainer); } @Override @@ -351,6 +374,60 @@ public class TimelineClientImpl extends TimelineClient { YarnException { doPosting(domain, "domain"); } + + // Used for new timeline service only + @Private + public void putObjects(String path, MultivaluedMap<String, String> params, + Object obj) throws IOException, YarnException { + + // timelineServiceAddress could haven't be initialized yet + // or stale (only for new timeline service) + int retries = pollTimelineServiceAddress(this.maxServiceRetries); + + // timelineServiceAddress could be stale, add retry logic here. + boolean needRetry = true; + while (needRetry) { + try { + URI uri = constructResURI(getConfig(), timelineServiceAddress, true); + putObjects(uri, path, params, obj); + needRetry = false; + } + catch (Exception e) { + // TODO only handle exception for timelineServiceAddress being updated. + // skip retry for other exceptions. + checkRetryWithSleep(retries, e); + retries--; + } + } + } + + /** + * Check if reaching to maximum of retries. + * @param retries + * @param e + */ + private void checkRetryWithSleep(int retries, Exception e) throws + YarnException, IOException { + if (retries > 0) { + try { + Thread.sleep(this.serviceRetryInterval); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } else { + LOG.error( + "TimelineClient has reached to max retry times :" + + this.maxServiceRetries + " for service address: " + + timelineServiceAddress); + if (e instanceof YarnException) { + throw (YarnException)e; + } else if (e instanceof IOException) { + throw (IOException)e; + } else { + throw new YarnException(e); + } + } + } private void putObjects( URI base, String path, MultivaluedMap<String, String> params, Object obj) @@ -411,6 +488,15 @@ public class TimelineClientImpl extends TimelineClient { } return resp; } + + @Override + public void setTimelineServiceAddress(String address) { + this.timelineServiceAddress = address; + } + + private String getTimelineServiceAddress() { + return this.timelineServiceAddress; + } @SuppressWarnings("unchecked") @Override @@ -425,8 +511,10 @@ public class TimelineClientImpl extends TimelineClient { DelegationTokenAuthenticatedURL authUrl = new DelegationTokenAuthenticatedURL(authenticator, connConfigurator); + // TODO we should add retry logic here if timelineServiceAddress is + // not available immediately. return (Token) authUrl.getDelegationToken( - constructResURI(getConfig(), timelineServiceAddress, false).toURL(), + constructResURI(getConfig(), getTimelineServiceAddress(), false).toURL(), token, renewer, doAsUser); } }; @@ -536,6 +624,7 @@ public class TimelineClientImpl extends TimelineClient { return connectionRetry.retryOn(tokenRetryOp); } + // Old timeline service, no external retry logic. @Private @VisibleForTesting public ClientResponse doPostingObject(Object object, String path) { @@ -553,6 +642,24 @@ public class TimelineClientImpl extends TimelineClient { throw new YarnRuntimeException("Unknown resource type"); } } + + /** + * Poll TimelineServiceAddress for maximum of retries times if it is null + * @param retries + * @return the left retry times + */ + private int pollTimelineServiceAddress(int retries) { + while (timelineServiceAddress == null && retries > 0) { + try { + Thread.sleep(this.serviceRetryInterval); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + timelineServiceAddress = getTimelineServiceAddress(); + retries--; + } + return retries; + } private class TimelineURLConnectionFactory implements HttpURLConnectionFactory { http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java index 459c110..8597427 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java @@ -201,7 +201,7 @@ public class WebAppUtils { return getResolvedAddress(address); } - private static String getResolvedAddress(InetSocketAddress address) { + public static String getResolvedAddress(InetSocketAddress address) { address = NetUtils.getConnectAddress(address); StringBuilder sb = new StringBuilder(); InetAddress resolved = address.getAddress(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 402377d..2f16110 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -927,6 +927,12 @@ <name>yarn.nodemanager.container-manager.thread-count</name> <value>20</value> </property> + + <property> + <description>Number of threads aggregator service uses.</description> + <name>yarn.nodemanager.aggregator-service.thread-count</name> + <value>5</value> + </property> <property> <description>Number of threads used in cleanup.</description> @@ -995,6 +1001,13 @@ <name>yarn.nodemanager.localizer.address</name> <value>${yarn.nodemanager.hostname}:8040</value> </property> + + + <property> + <description>Address where the aggregator service IPC is.</description> + <name>yarn.nodemanager.aggregator-service.address</name> + <value>${yarn.nodemanager.hostname}:8048</value> + </property> <property> <description>Interval in between cache cleanups.</description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java index e2071dd..26d6d04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; @@ -105,7 +106,7 @@ public class TestContainerLaunchRPC { resource, System.currentTimeMillis() + 10000, 42, 42, Priority.newInstance(0), 0); Token containerToken = - TestRPC.newContainerToken(nodeId, "password".getBytes(), + newContainerToken(nodeId, "password".getBytes(), containerTokenIdentifier); StartContainerRequest scRequest = @@ -130,6 +131,19 @@ public class TestContainerLaunchRPC { Assert.fail("timeout exception should have occurred!"); } + + public static Token newContainerToken(NodeId nodeId, byte[] password, + ContainerTokenIdentifier tokenIdentifier) { + // RPC layer client expects ip:port as service for tokens + InetSocketAddress addr = + NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort()); + // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token + Token containerToken = + Token.newInstance(tokenIdentifier.getBytes(), + ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil + .buildTokenService(addr).toString()); + return containerToken; + } public class DummyContainerManager implements ContainerManagementProtocol { http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java deleted file mode 100644 index 39e6162..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ /dev/null @@ -1,247 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.yarn; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.yarn.api.ApplicationClientProtocol; -import org.apache.hadoop.yarn.api.ContainerManagementProtocol; -import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; -import org.apache.hadoop.yarn.ipc.RPCUtil; -import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; -import org.apache.hadoop.yarn.util.Records; -import org.junit.Assert; -import org.junit.Test; - -public class TestRPC { - - private static final String EXCEPTION_MSG = "test error"; - private static final String EXCEPTION_CAUSE = "exception cause"; - private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - - @Test - public void testUnknownCall() { - Configuration conf = new Configuration(); - conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class - .getName()); - YarnRPC rpc = YarnRPC.create(conf); - String bindAddr = "localhost:0"; - InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); - Server server = rpc.getServer(ContainerManagementProtocol.class, - new DummyContainerManager(), addr, conf, null, 1); - server.start(); - - // Any unrelated protocol would do - ApplicationClientProtocol proxy = (ApplicationClientProtocol) rpc.getProxy( - ApplicationClientProtocol.class, NetUtils.getConnectAddress(server), conf); - - try { - proxy.getNewApplication(Records - .newRecord(GetNewApplicationRequest.class)); - Assert.fail("Excepted RPC call to fail with unknown method."); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage().matches( - "Unknown method getNewApplication called on.*" - + "org.apache.hadoop.yarn.proto.ApplicationClientProtocol" - + "\\$ApplicationClientProtocolService\\$BlockingInterface protocol.")); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Test - public void testHadoopProtoRPC() throws Exception { - test(HadoopYarnProtoRPC.class.getName()); - } - - private void test(String rpcClass) throws Exception { - Configuration conf = new Configuration(); - conf.set(YarnConfiguration.IPC_RPC_IMPL, rpcClass); - YarnRPC rpc = YarnRPC.create(conf); - String bindAddr = "localhost:0"; - InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); - Server server = rpc.getServer(ContainerManagementProtocol.class, - new DummyContainerManager(), addr, conf, null, 1); - server.start(); - RPC.setProtocolEngine(conf, ContainerManagementProtocolPB.class, ProtobufRpcEngine.class); - ContainerManagementProtocol proxy = (ContainerManagementProtocol) - rpc.getProxy(ContainerManagementProtocol.class, - NetUtils.getConnectAddress(server), conf); - ContainerLaunchContext containerLaunchContext = - recordFactory.newRecordInstance(ContainerLaunchContext.class); - - ApplicationId applicationId = ApplicationId.newInstance(0, 0); - ApplicationAttemptId applicationAttemptId = - ApplicationAttemptId.newInstance(applicationId, 0); - ContainerId containerId = - ContainerId.newContainerId(applicationAttemptId, 100); - NodeId nodeId = NodeId.newInstance("localhost", 1234); - Resource resource = Resource.newInstance(1234, 2); - ContainerTokenIdentifier containerTokenIdentifier = - new ContainerTokenIdentifier(containerId, "localhost", "user", - resource, System.currentTimeMillis() + 10000, 42, 42, - Priority.newInstance(0), 0); - Token containerToken = newContainerToken(nodeId, "password".getBytes(), - containerTokenIdentifier); - - StartContainerRequest scRequest = - StartContainerRequest.newInstance(containerLaunchContext, - containerToken); - List<StartContainerRequest> list = new ArrayList<StartContainerRequest>(); - list.add(scRequest); - StartContainersRequest allRequests = - StartContainersRequest.newInstance(list); - proxy.startContainers(allRequests); - - List<ContainerId> containerIds = new ArrayList<ContainerId>(); - containerIds.add(containerId); - GetContainerStatusesRequest gcsRequest = - GetContainerStatusesRequest.newInstance(containerIds); - GetContainerStatusesResponse response = - proxy.getContainerStatuses(gcsRequest); - List<ContainerStatus> statuses = response.getContainerStatuses(); - - //test remote exception - boolean exception = false; - try { - StopContainersRequest stopRequest = - recordFactory.newRecordInstance(StopContainersRequest.class); - stopRequest.setContainerIds(containerIds); - proxy.stopContainers(stopRequest); - } catch (YarnException e) { - exception = true; - Assert.assertTrue(e.getMessage().contains(EXCEPTION_MSG)); - Assert.assertTrue(e.getMessage().contains(EXCEPTION_CAUSE)); - System.out.println("Test Exception is " + e.getMessage()); - } catch (Exception ex) { - ex.printStackTrace(); - } - Assert.assertTrue(exception); - - server.stop(); - Assert.assertNotNull(statuses.get(0)); - Assert.assertEquals(ContainerState.RUNNING, statuses.get(0).getState()); - } - - public class DummyContainerManager implements ContainerManagementProtocol { - - private List<ContainerStatus> statuses = new ArrayList<ContainerStatus>(); - - @Override - public GetContainerStatusesResponse getContainerStatuses( - GetContainerStatusesRequest request) - throws YarnException { - GetContainerStatusesResponse response = - recordFactory.newRecordInstance(GetContainerStatusesResponse.class); - response.setContainerStatuses(statuses); - return response; - } - - @Override - public StartContainersResponse startContainers( - StartContainersRequest requests) throws YarnException { - StartContainersResponse response = - recordFactory.newRecordInstance(StartContainersResponse.class); - for (StartContainerRequest request : requests.getStartContainerRequests()) { - Token containerToken = request.getContainerToken(); - ContainerTokenIdentifier tokenId = null; - - try { - tokenId = newContainerTokenIdentifier(containerToken); - } catch (IOException e) { - throw RPCUtil.getRemoteException(e); - } - ContainerStatus status = - recordFactory.newRecordInstance(ContainerStatus.class); - status.setState(ContainerState.RUNNING); - status.setContainerId(tokenId.getContainerID()); - status.setExitStatus(0); - statuses.add(status); - - } - return response; - } - - @Override - public StopContainersResponse stopContainers(StopContainersRequest request) - throws YarnException { - Exception e = new Exception(EXCEPTION_MSG, - new Exception(EXCEPTION_CAUSE)); - throw new YarnException(e); - } - } - - public static ContainerTokenIdentifier newContainerTokenIdentifier( - Token containerToken) throws IOException { - org.apache.hadoop.security.token.Token<ContainerTokenIdentifier> token = - new org.apache.hadoop.security.token.Token<ContainerTokenIdentifier>( - containerToken.getIdentifier() - .array(), containerToken.getPassword().array(), new Text( - containerToken.getKind()), - new Text(containerToken.getService())); - return token.decodeIdentifier(); - } - - public static Token newContainerToken(NodeId nodeId, byte[] password, - ContainerTokenIdentifier tokenIdentifier) { - // RPC layer client expects ip:port as service for tokens - InetSocketAddress addr = - NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort()); - // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token - Token containerToken = - Token.newInstance(tokenIdentifier.getBytes(), - ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil - .buildTokenService(addr).toString()); - return containerToken; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java index fbe9af9..ef0bdcc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java @@ -111,4 +111,21 @@ public class TestAllocateResponse { Assert.assertEquals(0, r.getIncreasedContainers().size()); Assert.assertEquals(0, r.getDecreasedContainers().size()); } + + @SuppressWarnings("deprecation") + @Test + public void testAllocateResponseWithAggregatorAddress() { + final String aggregatorAddr = "localhost:0"; + AllocateResponse r = + AllocateResponse.newInstance(3, new ArrayList<ContainerStatus>(), + new ArrayList<Container>(), new ArrayList<NodeReport>(), null, + AMCommand.AM_RESYNC, 3, null, new ArrayList<NMToken>(), null, + null, null, aggregatorAddr); + + AllocateResponseProto p = ((AllocateResponsePBImpl) r).getProto(); + r = new AllocateResponsePBImpl(p); + + // check value + Assert.assertEquals(aggregatorAddr, r.getAggregatorAddr()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 18040b4..7148837 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -142,6 +142,7 @@ <include>yarn_server_common_service_protos.proto</include> <include>ResourceTracker.proto</include> <include>SCMUploader.proto</include> + <include>aggregatornodemanager_protocol.proto</include> </includes> </source> <output>${project.build.directory}/generated-sources/java</output> http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java new file mode 100644 index 0000000..53bdb4e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse; + +/** + * <p>The protocol between an <code>TimelineAggregatorsCollection</code> and a + * <code>NodeManager</code> to report a new application aggregator get launched. + * </p> + * + */ +@Private +public interface AggregatorNodemanagerProtocol { + + /** + * + * <p> + * The <code>TimelineAggregatorsCollection</code> provides a list of mapping + * between application and aggregator's address in + * {@link ReportNewAggregatorsInfoRequest} to a <code>NodeManager</code> to + * <em>register</em> aggregator's info, include: applicationId and REST URI to + * access aggregator. NodeManager will add them into registered aggregators + * and register them into <code>ResourceManager</code> afterwards. + * </p> + * + * @param request the request of registering a new aggregator or a list of aggregators + * @return + * @throws YarnException + * @throws IOException + */ + ReportNewAggregatorsInfoResponse reportNewAggregatorInfo( + ReportNewAggregatorsInfoRequest request) + throws YarnException, IOException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java new file mode 100644 index 0000000..4df80a5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.yarn.proto.AggregatorNodemanagerProtocol.AggregatorNodemanagerProtocolService; + +@Private +@Unstable +@ProtocolInfo( + protocolName = "org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB", + protocolVersion = 1) +public interface AggregatorNodemanagerProtocolPB extends + AggregatorNodemanagerProtocolService.BlockingInterface { + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java new file mode 100644 index 0000000..6e777e7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.impl.pb.client; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto; +import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoResponsePBImpl; + +import com.google.protobuf.ServiceException; + +public class AggregatorNodemanagerProtocolPBClientImpl implements + AggregatorNodemanagerProtocol, Closeable { + + // Not a documented config. Only used for tests internally + static final String NM_COMMAND_TIMEOUT = YarnConfiguration.YARN_PREFIX + + "rpc.nm-command-timeout"; + + /** + * Maximum of 1 minute timeout for a Node to react to the command + */ + static final int DEFAULT_COMMAND_TIMEOUT = 60000; + + private AggregatorNodemanagerProtocolPB proxy; + + @Private + public AggregatorNodemanagerProtocolPBClientImpl(long clientVersion, + InetSocketAddress addr, Configuration conf) throws IOException { + RPC.setProtocolEngine(conf, AggregatorNodemanagerProtocolPB.class, + ProtobufRpcEngine.class); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + int expireIntvl = conf.getInt(NM_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT); + proxy = + (AggregatorNodemanagerProtocolPB) RPC.getProxy( + AggregatorNodemanagerProtocolPB.class, + clientVersion, addr, ugi, conf, + NetUtils.getDefaultSocketFactory(conf), expireIntvl); + } + + @Override + public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo( + ReportNewAggregatorsInfoRequest request) throws YarnException, IOException { + + ReportNewAggregatorsInfoRequestProto requestProto = + ((ReportNewAggregatorsInfoRequestPBImpl) request).getProto(); + try { + return new ReportNewAggregatorsInfoResponsePBImpl( + proxy.reportNewAggregatorInfo(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public void close() { + if (this.proxy != null) { + RPC.stopProxy(this.proxy); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java new file mode 100644 index 0000000..87bce16 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.impl.pb.service; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoResponseProto; +import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoResponsePBImpl; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +public class AggregatorNodemanagerProtocolPBServiceImpl implements + AggregatorNodemanagerProtocolPB { + + private AggregatorNodemanagerProtocol real; + + public AggregatorNodemanagerProtocolPBServiceImpl(AggregatorNodemanagerProtocol impl) { + this.real = impl; + } + + @Override + public ReportNewAggregatorsInfoResponseProto reportNewAggregatorInfo( + RpcController arg0, ReportNewAggregatorsInfoRequestProto proto) + throws ServiceException { + ReportNewAggregatorsInfoRequestPBImpl request = + new ReportNewAggregatorsInfoRequestPBImpl(proto); + try { + ReportNewAggregatorsInfoResponse response = real.reportNewAggregatorInfo(request); + return ((ReportNewAggregatorsInfoResponsePBImpl)response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index 84ca8a4..09cb9e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -19,8 +19,10 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import java.util.List; +import java.util.Map; import java.util.Set; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -41,6 +43,22 @@ public abstract class NodeHeartbeatRequest { nodeHeartbeatRequest.setNodeLabels(nodeLabels); return nodeHeartbeatRequest; } + + public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, + MasterKey lastKnownContainerTokenMasterKey, + MasterKey lastKnownNMTokenMasterKey, Set<NodeLabel> nodeLabels, + Map<ApplicationId, String> registeredAggregators) { + NodeHeartbeatRequest nodeHeartbeatRequest = + Records.newRecord(NodeHeartbeatRequest.class); + nodeHeartbeatRequest.setNodeStatus(nodeStatus); + nodeHeartbeatRequest + .setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey); + nodeHeartbeatRequest + .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey); + nodeHeartbeatRequest.setNodeLabels(nodeLabels); + nodeHeartbeatRequest.setRegisteredAggregators(registeredAggregators); + return nodeHeartbeatRequest; + } public abstract NodeStatus getNodeStatus(); public abstract void setNodeStatus(NodeStatus status); @@ -59,4 +77,8 @@ public abstract class NodeHeartbeatRequest { public abstract void setLogAggregationReportsForApps( List<LogAggregationReport> logAggregationReportsForApps); + + // This tells RM registered aggregators' address info on this node + public abstract Map<ApplicationId, String> getRegisteredAggregators(); + public abstract void setRegisteredAggregators(Map<ApplicationId, String> appAggregatorsMap); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 1498a0c..7019ad7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -35,6 +35,10 @@ public interface NodeHeartbeatResponse { List<ContainerId> getContainersToBeRemovedFromNM(); List<ApplicationId> getApplicationsToCleanup(); + + // This tells NM the aggregators' address info of related Apps + Map<ApplicationId, String> getAppAggregatorsMap(); + void setAppAggregatorsMap(Map<ApplicationId, String> appAggregatorsMap); void setResponseId(int responseId); void setNodeAction(NodeAction action); http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java new file mode 100644 index 0000000..ae538a2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import java.util.List; +import java.util.Arrays; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap; +import org.apache.hadoop.yarn.util.Records; + +@Private +public abstract class ReportNewAggregatorsInfoRequest { + + public static ReportNewAggregatorsInfoRequest newInstance( + List<AppAggregatorsMap> appAggregatorsList) { + ReportNewAggregatorsInfoRequest request = + Records.newRecord(ReportNewAggregatorsInfoRequest.class); + request.setAppAggregatorsList(appAggregatorsList); + return request; + } + + public static ReportNewAggregatorsInfoRequest newInstance( + ApplicationId id, String aggregatorAddr) { + ReportNewAggregatorsInfoRequest request = + Records.newRecord(ReportNewAggregatorsInfoRequest.class); + request.setAppAggregatorsList( + Arrays.asList(AppAggregatorsMap.newInstance(id, aggregatorAddr))); + return request; + } + + public abstract List<AppAggregatorsMap> getAppAggregatorsList(); + + public abstract void setAppAggregatorsList( + List<AppAggregatorsMap> appAggregatorsList); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java new file mode 100644 index 0000000..3b847d6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.util.Records; + +public abstract class ReportNewAggregatorsInfoResponse { + + @Private + public static ReportNewAggregatorsInfoResponse newInstance() { + ReportNewAggregatorsInfoResponse response = + Records.newRecord(ReportNewAggregatorsInfoResponse.class); + return response; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index 0a9895e..02e0a3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -19,16 +19,22 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder; @@ -52,6 +58,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { private Set<NodeLabel> labels = null; private List<LogAggregationReport> logAggregationReportsForApps = null; + Map<ApplicationId, String> registeredAggregators = null; + public NodeHeartbeatRequestPBImpl() { builder = NodeHeartbeatRequestProto.newBuilder(); } @@ -106,6 +114,9 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { if (this.logAggregationReportsForApps != null) { addLogAggregationStatusForAppsToProto(); } + if (this.registeredAggregators != null) { + addRegisteredAggregatorsToProto(); + } } private void addLogAggregationStatusForAppsToProto() { @@ -146,6 +157,16 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { LogAggregationReport value) { return ((LogAggregationReportPBImpl) value).getProto(); } + + private void addRegisteredAggregatorsToProto() { + maybeInitBuilder(); + builder.clearRegisteredAggregators(); + for (Map.Entry<ApplicationId, String> entry : registeredAggregators.entrySet()) { + builder.addRegisteredAggregators(AppAggregatorsMapProto.newBuilder() + .setAppId(convertToProtoFormat(entry.getKey())) + .setAppAggregatorAddr(entry.getValue())); + } + } private void mergeLocalToProto() { if (viaProto) @@ -227,6 +248,36 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { builder.clearLastKnownNmTokenMasterKey(); this.lastKnownNMTokenMasterKey = masterKey; } + + @Override + public Map<ApplicationId, String> getRegisteredAggregators() { + if (this.registeredAggregators != null) { + return this.registeredAggregators; + } + initRegisteredAggregators(); + return registeredAggregators; + } + + private void initRegisteredAggregators() { + NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; + List<AppAggregatorsMapProto> list = p.getRegisteredAggregatorsList(); + this.registeredAggregators = new HashMap<ApplicationId, String> (); + for (AppAggregatorsMapProto c : list) { + ApplicationId appId = convertFromProtoFormat(c.getAppId()); + this.registeredAggregators.put(appId, c.getAppAggregatorAddr()); + } + } + + @Override + public void setRegisteredAggregators( + Map<ApplicationId, String> registeredAggregators) { + if (registeredAggregators == null || registeredAggregators.isEmpty()) { + return; + } + maybeInitBuilder(); + this.registeredAggregators = new HashMap<ApplicationId, String>(); + this.registeredAggregators.putAll(registeredAggregators); + } private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) { return new NodeStatusPBImpl(p); @@ -235,6 +286,14 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { private NodeStatusProto convertToProtoFormat(NodeStatus t) { return ((NodeStatusPBImpl)t).getProto(); } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) { return new MasterKeyPBImpl(p); http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index e27d8ca..197245c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; @@ -55,6 +56,8 @@ public class NodeHeartbeatResponsePBImpl extends private List<ContainerId> containersToBeRemovedFromNM = null; private List<ApplicationId> applicationsToCleanup = null; private Map<ApplicationId, ByteBuffer> systemCredentials = null; + + Map<ApplicationId, String> appAggregatorsMap = null; private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; @@ -96,6 +99,10 @@ public class NodeHeartbeatResponsePBImpl extends if (this.systemCredentials != null) { addSystemCredentialsToProto(); } + + if (this.appAggregatorsMap != null) { + addAppAggregatorsMapToProto(); + } } private void addSystemCredentialsToProto() { @@ -108,6 +115,16 @@ public class NodeHeartbeatResponsePBImpl extends entry.getValue().duplicate()))); } } + + private void addAppAggregatorsMapToProto() { + maybeInitBuilder(); + builder.clearAppAggregatorsMap(); + for (Map.Entry<ApplicationId, String> entry : appAggregatorsMap.entrySet()) { + builder.addAppAggregatorsMap(AppAggregatorsMapProto.newBuilder() + .setAppId(convertToProtoFormat(entry.getKey())) + .setAppAggregatorAddr(entry.getValue())); + } + } private void mergeLocalToProto() { if (viaProto) @@ -417,6 +434,15 @@ public class NodeHeartbeatResponsePBImpl extends initSystemCredentials(); return systemCredentials; } + + @Override + public Map<ApplicationId, String> getAppAggregatorsMap() { + if (this.appAggregatorsMap != null) { + return this.appAggregatorsMap; + } + initAppAggregatorsMap(); + return appAggregatorsMap; + } private void initSystemCredentials() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; @@ -428,6 +454,16 @@ public class NodeHeartbeatResponsePBImpl extends this.systemCredentials.put(appId, byteBuffer); } } + + private void initAppAggregatorsMap() { + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List<AppAggregatorsMapProto> list = p.getAppAggregatorsMapList(); + this.appAggregatorsMap = new HashMap<ApplicationId, String> (); + for (AppAggregatorsMapProto c : list) { + ApplicationId appId = convertFromProtoFormat(c.getAppId()); + this.appAggregatorsMap.put(appId, c.getAppAggregatorAddr()); + } + } @Override public void setSystemCredentialsForApps( @@ -439,6 +475,17 @@ public class NodeHeartbeatResponsePBImpl extends this.systemCredentials = new HashMap<ApplicationId, ByteBuffer>(); this.systemCredentials.putAll(systemCredentials); } + + @Override + public void setAppAggregatorsMap( + Map<ApplicationId, String> appAggregatorsMap) { + if (appAggregatorsMap == null || appAggregatorsMap.isEmpty()) { + return; + } + maybeInitBuilder(); + this.appAggregatorsMap = new HashMap<ApplicationId, String>(); + this.appAggregatorsMap.putAll(appAggregatorsMap); + } @Override public long getNextHeartBeatInterval() {
