YARN-3333. Rename TimelineAggregator etc. to TimelineCollector. Contributed by Sangjin Lee
(cherry picked from commit dda84085cabd8fdf143b380e54e1730802fd9912) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/63c7210c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/63c7210c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/63c7210c Branch: refs/heads/YARN-2928 Commit: 63c7210c248be9a8e65b249b0593f1ace6003db5 Parents: 32acd9b Author: Junping Du <junping...@apache.org> Authored: Thu Mar 19 11:49:07 2015 -0700 Committer: Sangjin Lee <sj...@apache.org> Committed: Tue Aug 25 10:38:43 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 5 +- .../hadoop-yarn/hadoop-yarn-api/pom.xml | 4 + .../api/protocolrecords/AllocateResponse.java | 20 +- .../timelineservice/TimelineWriteResponse.java | 20 +- .../hadoop/yarn/conf/YarnConfiguration.java | 20 +- .../src/main/proto/yarn_service_protos.proto | 2 +- .../pom.xml | 10 + .../distributedshell/ApplicationMaster.java | 54 ++-- .../applications/distributedshell/Client.java | 8 +- .../distributedshell/TestDistributedShell.java | 10 +- .../hadoop/yarn/client/api/AMRMClient.java | 6 +- .../yarn/client/api/async/AMRMClientAsync.java | 4 +- .../api/async/impl/AMRMClientAsyncImpl.java | 20 +- .../impl/pb/AllocateResponsePBImpl.java | 16 +- .../hadoop/yarn/client/api/TimelineClient.java | 2 +- .../client/api/impl/TimelineClientImpl.java | 32 +-- .../src/main/resources/yarn-default.xml | 14 +- .../hadoop/yarn/TestContainerLaunchRPC.java | 2 +- .../hadoop/yarn/api/TestAllocateResponse.java | 12 +- .../hadoop-yarn-server-common/pom.xml | 2 +- .../api/AggregatorNodemanagerProtocol.java | 56 ---- .../api/AggregatorNodemanagerProtocolPB.java | 33 --- .../api/CollectorNodemanagerProtocol.java | 57 ++++ .../api/CollectorNodemanagerProtocolPB.java | 33 +++ ...gregatorNodemanagerProtocolPBClientImpl.java | 94 ------- ...ollectorNodemanagerProtocolPBClientImpl.java | 94 +++++++ ...regatorNodemanagerProtocolPBServiceImpl.java | 61 ---- ...llectorNodemanagerProtocolPBServiceImpl.java | 59 ++++ .../protocolrecords/NodeHeartbeatRequest.java | 13 +- .../protocolrecords/NodeHeartbeatResponse.java | 8 +- .../ReportNewAggregatorsInfoRequest.java | 53 ---- .../ReportNewAggregatorsInfoResponse.java | 32 --- .../ReportNewCollectorInfoRequest.java | 53 ++++ .../ReportNewCollectorInfoResponse.java | 32 +++ .../impl/pb/NodeHeartbeatRequestPBImpl.java | 58 ++-- .../impl/pb/NodeHeartbeatResponsePBImpl.java | 60 ++-- .../ReportNewAggregatorsInfoRequestPBImpl.java | 142 ---------- .../ReportNewAggregatorsInfoResponsePBImpl.java | 74 ----- .../pb/ReportNewCollectorInfoRequestPBImpl.java | 142 ++++++++++ .../ReportNewCollectorInfoResponsePBImpl.java | 74 +++++ .../server/api/records/AppAggregatorsMap.java | 33 --- .../server/api/records/AppCollectorsMap.java | 46 +++ .../impl/pb/AppAggregatorsMapPBImpl.java | 151 ---------- .../records/impl/pb/AppCollectorsMapPBImpl.java | 151 ++++++++++ .../proto/aggregatornodemanager_protocol.proto | 29 -- .../proto/collectornodemanager_protocol.proto | 29 ++ .../yarn_server_common_service_protos.proto | 18 +- .../java/org/apache/hadoop/yarn/TestRPC.java | 116 ++++---- .../hadoop/yarn/TestYarnServerApiClasses.java | 24 +- .../hadoop/yarn/server/nodemanager/Context.java | 14 +- .../yarn/server/nodemanager/NodeManager.java | 56 ++-- .../nodemanager/NodeStatusUpdaterImpl.java | 11 +- .../aggregatormanager/NMAggregatorService.java | 113 -------- .../collectormanager/NMCollectorService.java | 110 ++++++++ .../application/ApplicationImpl.java | 9 +- .../ApplicationMasterService.java | 12 +- .../resourcemanager/ResourceTrackerService.java | 72 ++--- .../server/resourcemanager/rmapp/RMApp.java | 22 +- .../rmapp/RMAppAggregatorUpdateEvent.java | 36 --- .../rmapp/RMAppCollectorUpdateEvent.java | 37 +++ .../resourcemanager/rmapp/RMAppEventType.java | 4 +- .../server/resourcemanager/rmapp/RMAppImpl.java | 60 ++-- .../applicationsmanager/MockAsm.java | 6 +- .../server/resourcemanager/rmapp/MockRMApp.java | 8 +- .../hadoop-yarn-server-tests/pom.xml | 5 + .../TestTimelineServiceClientIntegration.java | 52 +++- .../hadoop-yarn-server-timelineservice/pom.xml | 10 + .../aggregator/AppLevelTimelineAggregator.java | 57 ---- .../PerNodeTimelineAggregatorsAuxService.java | 211 -------------- .../aggregator/TimelineAggregator.java | 122 -------- .../TimelineAggregatorWebService.java | 180 ------------ .../TimelineAggregatorsCollection.java | 271 ------------------ .../collector/AppLevelTimelineCollector.java | 57 ++++ .../PerNodeTimelineCollectorsAuxService.java | 214 ++++++++++++++ .../collector/TimelineCollector.java | 122 ++++++++ .../collector/TimelineCollectorManager.java | 278 +++++++++++++++++++ .../collector/TimelineCollectorWebService.java | 183 ++++++++++++ .../storage/FileSystemTimelineWriterImpl.java | 6 +- .../TestAppLevelTimelineAggregator.java | 23 -- ...estPerNodeTimelineAggregatorsAuxService.java | 150 ---------- .../TestTimelineAggregatorsCollection.java | 109 -------- .../TestAppLevelTimelineCollector.java | 23 ++ ...TestPerNodeTimelineCollectorsAuxService.java | 164 +++++++++++ .../collector/TestTimelineCollectorManager.java | 118 ++++++++ .../TestFileSystemTimelineWriterImpl.java | 43 ++- 85 files changed, 2588 insertions(+), 2468 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index aa73ce3..42da7bf 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -32,13 +32,16 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-3039. Implemented the app-level timeline aggregator discovery service. (Junping Du via zjshen) + YARN-3333. Rename TimelineAggregator etc. to TimelineCollector. (Sangjin Lee + via junping_du) + IMPROVEMENTS OPTIMIZATIONS BUG FIXES -Trunk - Unreleased +Trunk - Unreleased INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml index 5c4156b..ed74a44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml @@ -45,6 +45,10 @@ <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> </dependency> + <dependency> + <groupId>javax.xml.bind</groupId> + <artifactId>jaxb-api</artifactId> + </dependency> <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <dependency> http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index 421c2a0..6703249 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -120,7 +120,7 @@ public abstract class AllocateResponse { response.setAMRMToken(amRMToken); return response; } - + @Public @Unstable public static AllocateResponse newInstance(int responseId, @@ -130,13 +130,13 @@ public abstract class AllocateResponse { PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken, List<ContainerResourceIncrease> increasedContainers, List<ContainerResourceDecrease> decreasedContainers, - String aggregatorAddr) { + String collectorAddr) { AllocateResponse response = newInstance(responseId, completedContainers, allocatedContainers, updatedNodes, availResources, command, numClusterNodes, preempt, nmTokens, increasedContainers, decreasedContainers); response.setAMRMToken(amRMToken); - response.setAggregatorAddr(aggregatorAddr); + response.setCollectorAddr(collectorAddr); return response; } @@ -323,18 +323,18 @@ public abstract class AllocateResponse { @Private @Unstable public abstract void setAMRMToken(Token amRMToken); - + /** - * The address of aggregator that belong to this app + * The address of collector that belong to this app * - * @return The address of aggregator that belong to this attempt + * @return The address of collector that belong to this attempt */ @Public @Unstable - public abstract String getAggregatorAddr(); - + public abstract String getCollectorAddr(); + @Private @Unstable - public abstract void setAggregatorAddr(String aggregatorAddr); - + public abstract void setCollectorAddr(String collectorAddr); + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java index 82ecdbd..4739d8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java @@ -47,7 +47,7 @@ public class TimelineWriteResponse { /** * Get a list of {@link TimelineWriteError} instances - * + * * @return a list of {@link TimelineWriteError} instances */ @XmlElement(name = "errors") @@ -57,7 +57,7 @@ public class TimelineWriteResponse { /** * Add a single {@link TimelineWriteError} instance into the existing list - * + * * @param error * a single {@link TimelineWriteError} instance */ @@ -67,7 +67,7 @@ public class TimelineWriteResponse { /** * Add a list of {@link TimelineWriteError} instances into the existing list - * + * * @param errors * a list of {@link TimelineWriteError} instances */ @@ -77,7 +77,7 @@ public class TimelineWriteResponse { /** * Set the list to the given list of {@link TimelineWriteError} instances - * + * * @param errors * a list of {@link TimelineWriteError} instances */ @@ -107,7 +107,7 @@ public class TimelineWriteResponse { /** * Get the entity Id - * + * * @return the entity Id */ @XmlElement(name = "entity") @@ -117,7 +117,7 @@ public class TimelineWriteResponse { /** * Set the entity Id - * + * * @param entityId * the entity Id */ @@ -127,7 +127,7 @@ public class TimelineWriteResponse { /** * Get the entity type - * + * * @return the entity type */ @XmlElement(name = "entitytype") @@ -137,7 +137,7 @@ public class TimelineWriteResponse { /** * Set the entity type - * + * * @param entityType * the entity type */ @@ -147,7 +147,7 @@ public class TimelineWriteResponse { /** * Get the error code - * + * * @return an error code */ @XmlElement(name = "errorcode") @@ -157,7 +157,7 @@ public class TimelineWriteResponse { /** * Set the error code to the given error code - * + * * @param errorCode * an error code */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6874eec..9d5b63b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -672,10 +672,10 @@ public class YarnConfiguration extends Configuration { public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 20; /** Number of threads container manager uses.*/ - public static final String NM_AGGREGATOR_SERVICE_THREAD_COUNT = - NM_PREFIX + "aggregator-service.thread-count"; - public static final int DEFAULT_NM_AGGREGATOR_SERVICE_THREAD_COUNT = 5; - + public static final String NM_COLLECTOR_SERVICE_THREAD_COUNT = + NM_PREFIX + "collector-service.thread-count"; + public static final int DEFAULT_NM_COLLECTOR_SERVICE_THREAD_COUNT = 5; + /** Number of threads used in cleanup.*/ public static final String NM_DELETE_THREAD_COUNT = NM_PREFIX + "delete.thread-count"; @@ -703,13 +703,13 @@ public class YarnConfiguration extends Configuration { public static final String DEFAULT_NM_LOCALIZER_ADDRESS = "0.0.0.0:" + DEFAULT_NM_LOCALIZER_PORT; - /** Address where the aggregator service IPC is.*/ - public static final String NM_AGGREGATOR_SERVICE_ADDRESS = - NM_PREFIX + "aggregator-service.address"; - public static final int DEFAULT_NM_AGGREGATOR_SERVICE_PORT = 8048; - public static final String DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS = + /** Address where the collector service IPC is.*/ + public static final String NM_COLLECTOR_SERVICE_ADDRESS = + NM_PREFIX + "collector-service.address"; + public static final int DEFAULT_NM_COLLECTOR_SERVICE_PORT = 8048; + public static final String DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS = "0.0.0.0:" + DEFAULT_NM_LOCALIZER_PORT; - + /** Interval in between cache cleanups.*/ public static final String NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS = NM_PREFIX + "localizer.cache.cleanup.interval-ms"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 40b8bfa..23d6e75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -87,7 +87,7 @@ message AllocateResponseProto { repeated ContainerResourceIncreaseProto increased_containers = 10; repeated ContainerResourceDecreaseProto decreased_containers = 11; optional hadoop.common.TokenProto am_rm_token = 12; - optional string aggregator_addr = 13; + optional string collector_addr = 13; } enum SchedulerResourceTypes { http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml index 09a56ea..713f12b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml @@ -70,6 +70,16 @@ <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice</artifactId> + <scope>test</scope> + </dependency> <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <dependency> <groupId>org.apache.hadoop</groupId> http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index b728502..e107731 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -39,10 +39,10 @@ import java.util.Set; import java.util.Vector; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; @@ -216,12 +216,12 @@ public class ApplicationMaster { private int appMasterRpcPort = -1; // Tracking url to which app master publishes info for clients to monitor private String appMasterTrackingUrl = ""; - + private boolean newTimelineService = false; - + // For posting entities in new timeline service in a non-blocking way // TODO replace with event loop in TimelineClient. - private static ExecutorService threadPool = + private static ExecutorService threadPool = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") .build()); @@ -314,9 +314,9 @@ public class ApplicationMaster { } appMaster.run(); result = appMaster.finish(); - + threadPool.shutdown(); - + while (!threadPool.isTerminated()) { // wait for all posting thread to finish try { if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) { @@ -398,7 +398,7 @@ public class ApplicationMaster { "No. of containers on which the shell command needs to be executed"); opts.addOption("priority", true, "Application Priority. Default 0"); opts.addOption("debug", false, "Dump out debug information"); - opts.addOption("timeline_service_version", true, + opts.addOption("timeline_service_version", true, "Version for timeline service"); opts.addOption("help", false, "Print usage"); CommandLine cliParser = new GnuParser().parse(opts, args); @@ -539,7 +539,7 @@ public class ApplicationMaster { if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { if (cliParser.hasOption("timeline_service_version")) { - String timelineServiceVersion = + String timelineServiceVersion = cliParser.getOptionValue("timeline_service_version", "v1"); if (timelineServiceVersion.trim().equalsIgnoreCase("v1")) { newTimelineService = false; @@ -605,12 +605,12 @@ public class ApplicationMaster { appSubmitterUgi = UserGroupInformation.createRemoteUser(appSubmitterUserName); appSubmitterUgi.addCredentials(credentials); - + AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); amRMClient.init(conf); amRMClient.start(); - + containerListener = createNMCallbackHandler(); nmClientAsync = new NMClientAsyncImpl(containerListener); nmClientAsync.init(conf); @@ -795,7 +795,7 @@ public class ApplicationMaster { if(timelineClient != null) { timelineClient.stop(); } - + return success; } @@ -1291,11 +1291,11 @@ public class ApplicationMaster { } private static void publishContainerStartEventOnNewTimelineService( - final TimelineClient timelineClient, final Container container, + final TimelineClient timelineClient, final Container container, final String domainId, final UserGroupInformation ugi) { Runnable publishWrapper = new Runnable() { public void run() { - publishContainerStartEventOnNewTimelineServiceBase(timelineClient, + publishContainerStartEventOnNewTimelineServiceBase(timelineClient, container, domainId, ugi); } }; @@ -1305,14 +1305,14 @@ public class ApplicationMaster { private static void publishContainerStartEventOnNewTimelineServiceBase( final TimelineClient timelineClient, Container container, String domainId, UserGroupInformation ugi) { - final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); entity.setId(container.getId().toString()); entity.setType(DSEntity.DS_CONTAINER.toString()); //entity.setDomainId(domainId); entity.addInfo("user", ugi.getShortUserName()); - - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = + + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setId(DSEvent.DS_CONTAINER_START.toString()); @@ -1334,29 +1334,29 @@ public class ApplicationMaster { e instanceof UndeclaredThrowableException ? e.getCause() : e); } } - + private static void publishContainerEndEventOnNewTimelineService( final TimelineClient timelineClient, final ContainerStatus container, final String domainId, final UserGroupInformation ugi) { Runnable publishWrapper = new Runnable() { public void run() { - publishContainerEndEventOnNewTimelineServiceBase(timelineClient, + publishContainerEndEventOnNewTimelineServiceBase(timelineClient, container, domainId, ugi); } }; threadPool.execute(publishWrapper); } - + private static void publishContainerEndEventOnNewTimelineServiceBase( final TimelineClient timelineClient, final ContainerStatus container, final String domainId, final UserGroupInformation ugi) { - final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); entity.setId(container.getContainerId().toString()); entity.setType(DSEntity.DS_CONTAINER.toString()); //entity.setDomainId(domainId); entity.addInfo("user", ugi.getShortUserName()); - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setId(DSEvent.DS_CONTAINER_END.toString()); @@ -1381,28 +1381,28 @@ public class ApplicationMaster { private static void publishApplicationAttemptEventOnNewTimelineService( final TimelineClient timelineClient, final String appAttemptId, - final DSEvent appEvent, final String domainId, + final DSEvent appEvent, final String domainId, final UserGroupInformation ugi) { - + Runnable publishWrapper = new Runnable() { public void run() { - publishApplicationAttemptEventOnNewTimelineServiceBase(timelineClient, + publishApplicationAttemptEventOnNewTimelineServiceBase(timelineClient, appAttemptId, appEvent, domainId, ugi); } }; threadPool.execute(publishWrapper); } - + private static void publishApplicationAttemptEventOnNewTimelineServiceBase( final TimelineClient timelineClient, String appAttemptId, DSEvent appEvent, String domainId, UserGroupInformation ugi) { - final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); entity.setId(appAttemptId); entity.setType(DSEntity.DS_APP_ATTEMPT.toString()); //entity.setDomainId(domainId); entity.addInfo("user", ugi.getShortUserName()); - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); event.setId(appEvent.toString()); event.setTimestamp(System.currentTimeMillis()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 9f7f9c0..163f8f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -185,7 +185,7 @@ public class Client { // Command line options private Options opts; - + private String timelineServiceVersion; private static final String shellCommandPath = "shellCommands"; @@ -357,11 +357,11 @@ public class Client { throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting." + " Specified virtual cores=" + amVCores); } - + if (cliParser.hasOption("timeline_service_version")) { - timelineServiceVersion = + timelineServiceVersion = cliParser.getOptionValue("timeline_service_version", "v1"); - if (! (timelineServiceVersion.trim().equalsIgnoreCase("v1") || + if (! (timelineServiceVersion.trim().equalsIgnoreCase("v1") || timelineServiceVersion.trim().equalsIgnoreCase("v2"))) { throw new IllegalArgumentException( "timeline_service_version is not set properly, should be 'v1' or 'v2'"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 97d9168..0af050c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -50,7 +50,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService; +import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.junit.After; import org.junit.Assert; @@ -67,7 +67,7 @@ public class TestDistributedShell { protected MiniYARNCluster yarnCluster = null; protected YarnConfiguration conf = null; private static final int NUM_NMS = 1; - private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_aggregator"; + private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector"; protected final static String APPMASTER_JAR = JarFinder.getJar(ApplicationMaster.class); @@ -91,14 +91,14 @@ public class TestDistributedShell { // mark if we need to launch the v1 timeline server boolean enableATSV1 = false; if (!currTestName.getMethodName().toLowerCase().contains("v2")) { - // disable aux-service based timeline aggregators + // disable aux-service based timeline collectors conf.set(YarnConfiguration.NM_AUX_SERVICES, ""); enableATSV1 = true; } else { - // enable aux-service based timeline aggregators + // enable aux-service based timeline collectors conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME); conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME - + ".class", PerNodeTimelineAggregatorsAuxService.class.getName()); + + ".class", PerNodeTimelineCollectorsAuxService.class.getName()); } conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName()); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 56f2b10..afb2e09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -47,7 +47,7 @@ import com.google.common.collect.ImmutableList; public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends AbstractService { private static final Log LOG = LogFactory.getLog(AMRMClient.class); - + private TimelineClient timelineClient; /** @@ -382,7 +382,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends public void registerTimelineClient(TimelineClient timelineClient) { this.timelineClient = timelineClient; } - + /** * Get registered timeline client. * @return @@ -390,7 +390,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends public TimelineClient getRegisteredTimeineClient() { return this.timelineClient; } - + /** * Wait for <code>check</code> to return true for each 1000 ms. * See also {@link #waitFor(com.google.common.base.Supplier, int)} http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index be5610e..1a5c257 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -194,7 +194,7 @@ extends AbstractService { * @return Current number of nodes in the cluster */ public abstract int getClusterNodeCount(); - + /** * Register TimelineClient to AMRMClient. * @param timelineClient @@ -202,7 +202,7 @@ extends AbstractService { public void registerTimelineClient(TimelineClient timelineClient) { client.registerTimelineClient(timelineClient); } - + /** * Get registered timeline client. * @return http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index f0f0bc9..5351ef5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -66,8 +66,8 @@ extends AMRMClientAsync<T> { private volatile boolean keepRunning; private volatile float progress; - private volatile String aggregatorAddr; - + private volatile String collectorAddr; + private volatile Throwable savedException; public AMRMClientAsyncImpl(int intervalMs, CallbackHandler callbackHandler) { @@ -307,15 +307,15 @@ extends AMRMClientAsync<T> { if (!allocated.isEmpty()) { handler.onContainersAllocated(allocated); } - - String aggregatorAddress = response.getAggregatorAddr(); + + String collectorAddress = response.getCollectorAddr(); TimelineClient timelineClient = client.getRegisteredTimeineClient(); - if (timelineClient != null && aggregatorAddress != null - && !aggregatorAddress.isEmpty()) { - if (aggregatorAddr == null || - !aggregatorAddr.equals(aggregatorAddress)) { - aggregatorAddr = aggregatorAddress; - timelineClient.setTimelineServiceAddress(aggregatorAddress); + if (timelineClient != null && collectorAddress != null + && !collectorAddress.isEmpty()) { + if (collectorAddr == null || + !collectorAddr.equals(collectorAddress)) { + collectorAddr = collectorAddress; + timelineClient.setTimelineServiceAddress(collectorAddress); } } progress = handler.getProgress(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index 605c29a..8dc473c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -384,22 +384,22 @@ public class AllocateResponsePBImpl extends AllocateResponse { } this.amrmToken = amRMToken; } - + @Override - public String getAggregatorAddr() { + public String getCollectorAddr() { AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; - return p.getAggregatorAddr(); + return p.getCollectorAddr(); } - + @Override - public void setAggregatorAddr(String aggregatorAddr) { + public void setCollectorAddr(String collectorAddr) { maybeInitBuilder(); - if (aggregatorAddr == null) { - builder.clearAggregatorAddr(); + if (collectorAddr == null) { + builder.clearCollectorAddr(); return; } - builder.setAggregatorAddr(aggregatorAddr); + builder.setCollectorAddr(collectorAddr); } private synchronized void initLocalIncreasedContainerList() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index 27ef3e4..f2707ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -195,5 +195,5 @@ public abstract class TimelineClient extends AbstractService { * the timeline service address */ public abstract void setTimelineServiceAddress(String address); - + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 8992422..c722edf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -116,14 +116,14 @@ public class TimelineClientImpl extends TimelineClient { private URI resURI; private UserGroupInformation authUgi; private String doAsUser; - + private volatile String timelineServiceAddress; - + // Retry parameters for identifying new timeline service // TODO consider to merge with connection retry private int maxServiceRetries; private long serviceRetryInterval; - + private boolean newTimelineService = false; @Private @@ -323,7 +323,7 @@ public class TimelineClientImpl extends TimelineClient { YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS)); } LOG.info("Timeline service address: " + getTimelineServiceAddress()); - } + } super.serviceInit(conf); } @@ -374,16 +374,16 @@ public class TimelineClientImpl extends TimelineClient { YarnException { doPosting(domain, "domain"); } - + // Used for new timeline service only @Private - public void putObjects(String path, MultivaluedMap<String, String> params, + public void putObjects(String path, MultivaluedMap<String, String> params, Object obj) throws IOException, YarnException { - - // timelineServiceAddress could haven't be initialized yet + + // timelineServiceAddress could haven't be initialized yet // or stale (only for new timeline service) int retries = pollTimelineServiceAddress(this.maxServiceRetries); - + // timelineServiceAddress could be stale, add retry logic here. boolean needRetry = true; while (needRetry) { @@ -400,13 +400,13 @@ public class TimelineClientImpl extends TimelineClient { } } } - + /** * Check if reaching to maximum of retries. * @param retries * @param e */ - private void checkRetryWithSleep(int retries, Exception e) throws + private void checkRetryWithSleep(int retries, Exception e) throws YarnException, IOException { if (retries > 0) { try { @@ -416,8 +416,8 @@ public class TimelineClientImpl extends TimelineClient { } } else { LOG.error( - "TimelineClient has reached to max retry times :" + - this.maxServiceRetries + " for service address: " + + "TimelineClient has reached to max retry times :" + + this.maxServiceRetries + " for service address: " + timelineServiceAddress); if (e instanceof YarnException) { throw (YarnException)e; @@ -488,12 +488,12 @@ public class TimelineClientImpl extends TimelineClient { } return resp; } - + @Override public void setTimelineServiceAddress(String address) { this.timelineServiceAddress = address; } - + private String getTimelineServiceAddress() { return this.timelineServiceAddress; } @@ -642,7 +642,7 @@ public class TimelineClientImpl extends TimelineClient { throw new YarnRuntimeException("Unknown resource type"); } } - + /** * Poll TimelineServiceAddress for maximum of retries times if it is null * @param retries http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 85525a2..78b6ae8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -927,10 +927,10 @@ <name>yarn.nodemanager.container-manager.thread-count</name> <value>20</value> </property> - + <property> - <description>Number of threads aggregator service uses.</description> - <name>yarn.nodemanager.aggregator-service.thread-count</name> + <description>Number of threads collector service uses.</description> + <name>yarn.nodemanager.collector-service.thread-count</name> <value>5</value> </property> @@ -1001,11 +1001,11 @@ <name>yarn.nodemanager.localizer.address</name> <value>${yarn.nodemanager.hostname}:8040</value> </property> - - + + <property> - <description>Address where the aggregator service IPC is.</description> - <name>yarn.nodemanager.aggregator-service.address</name> + <description>Address where the collector service IPC is.</description> + <name>yarn.nodemanager.collector-service.address</name> <value>${yarn.nodemanager.hostname}:8048</value> </property> http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java index 26d6d04..515adc8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java @@ -131,7 +131,7 @@ public class TestContainerLaunchRPC { Assert.fail("timeout exception should have occurred!"); } - + public static Token newContainerToken(NodeId nodeId, byte[] password, ContainerTokenIdentifier tokenIdentifier) { // RPC layer client expects ip:port as service for tokens http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java index ef0bdcc..5af0f11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java @@ -111,21 +111,21 @@ public class TestAllocateResponse { Assert.assertEquals(0, r.getIncreasedContainers().size()); Assert.assertEquals(0, r.getDecreasedContainers().size()); } - + @SuppressWarnings("deprecation") @Test - public void testAllocateResponseWithAggregatorAddress() { - final String aggregatorAddr = "localhost:0"; + public void testAllocateResponseWithCollectorAddress() { + final String collectorAddr = "localhost:0"; AllocateResponse r = AllocateResponse.newInstance(3, new ArrayList<ContainerStatus>(), new ArrayList<Container>(), new ArrayList<NodeReport>(), null, - AMCommand.AM_RESYNC, 3, null, new ArrayList<NMToken>(), null, - null, null, aggregatorAddr); + AMCommand.AM_RESYNC, 3, null, new ArrayList<NMToken>(), null, + null, null, collectorAddr); AllocateResponseProto p = ((AllocateResponsePBImpl) r).getProto(); r = new AllocateResponsePBImpl(p); // check value - Assert.assertEquals(aggregatorAddr, r.getAggregatorAddr()); + Assert.assertEquals(collectorAddr, r.getCollectorAddr()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 7148837..21ecd97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -142,7 +142,7 @@ <include>yarn_server_common_service_protos.proto</include> <include>ResourceTracker.proto</include> <include>SCMUploader.proto</include> - <include>aggregatornodemanager_protocol.proto</include> + <include>collectornodemanager_protocol.proto</include> </includes> </source> <output>${project.build.directory}/generated-sources/java</output> http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java deleted file mode 100644 index 53bdb4e..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.api; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse; - -/** - * <p>The protocol between an <code>TimelineAggregatorsCollection</code> and a - * <code>NodeManager</code> to report a new application aggregator get launched. - * </p> - * - */ -@Private -public interface AggregatorNodemanagerProtocol { - - /** - * - * <p> - * The <code>TimelineAggregatorsCollection</code> provides a list of mapping - * between application and aggregator's address in - * {@link ReportNewAggregatorsInfoRequest} to a <code>NodeManager</code> to - * <em>register</em> aggregator's info, include: applicationId and REST URI to - * access aggregator. NodeManager will add them into registered aggregators - * and register them into <code>ResourceManager</code> afterwards. - * </p> - * - * @param request the request of registering a new aggregator or a list of aggregators - * @return - * @throws YarnException - * @throws IOException - */ - ReportNewAggregatorsInfoResponse reportNewAggregatorInfo( - ReportNewAggregatorsInfoRequest request) - throws YarnException, IOException; - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java deleted file mode 100644 index 4df80a5..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.api; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.ipc.ProtocolInfo; -import org.apache.hadoop.yarn.proto.AggregatorNodemanagerProtocol.AggregatorNodemanagerProtocolService; - -@Private -@Unstable -@ProtocolInfo( - protocolName = "org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB", - protocolVersion = 1) -public interface AggregatorNodemanagerProtocolPB extends - AggregatorNodemanagerProtocolService.BlockingInterface { - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java new file mode 100644 index 0000000..26c121a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; + +/** + * <p>The protocol between an <code>TimelineCollectorManager</code> and a + * <code>NodeManager</code> to report a new application collector get launched. + * </p> + * + */ +@Private +public interface CollectorNodemanagerProtocol { + + /** + * + * <p> + * The <code>TimelineCollectorManager</code> provides a list of mapping + * between application and collector's address in + * {@link ReportNewCollectorInfoRequest} to a <code>NodeManager</code> to + * <em>register</em> collector's info, include: applicationId and REST URI to + * access collector. NodeManager will add them into registered collectors + * and register them into <code>ResourceManager</code> afterwards. + * </p> + * + * @param request the request of registering a new collector or a list of + * collectors + * @return + * @throws YarnException + * @throws IOException + */ + ReportNewCollectorInfoResponse reportNewCollectorInfo( + ReportNewCollectorInfoRequest request) + throws YarnException, IOException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocolPB.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocolPB.java new file mode 100644 index 0000000..655e989 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocolPB.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.yarn.proto.CollectorNodemanagerProtocol.CollectorNodemanagerProtocolService; + +@Private +@Unstable +@ProtocolInfo( + protocolName = "org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB", + protocolVersion = 1) +public interface CollectorNodemanagerProtocolPB extends + CollectorNodemanagerProtocolService.BlockingInterface { + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java deleted file mode 100644 index 6e777e7..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.api.impl.pb.client; - -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.ipc.RPCUtil; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto; -import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol; -import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB; -import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoRequestPBImpl; -import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoResponsePBImpl; - -import com.google.protobuf.ServiceException; - -public class AggregatorNodemanagerProtocolPBClientImpl implements - AggregatorNodemanagerProtocol, Closeable { - - // Not a documented config. Only used for tests internally - static final String NM_COMMAND_TIMEOUT = YarnConfiguration.YARN_PREFIX - + "rpc.nm-command-timeout"; - - /** - * Maximum of 1 minute timeout for a Node to react to the command - */ - static final int DEFAULT_COMMAND_TIMEOUT = 60000; - - private AggregatorNodemanagerProtocolPB proxy; - - @Private - public AggregatorNodemanagerProtocolPBClientImpl(long clientVersion, - InetSocketAddress addr, Configuration conf) throws IOException { - RPC.setProtocolEngine(conf, AggregatorNodemanagerProtocolPB.class, - ProtobufRpcEngine.class); - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - - int expireIntvl = conf.getInt(NM_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT); - proxy = - (AggregatorNodemanagerProtocolPB) RPC.getProxy( - AggregatorNodemanagerProtocolPB.class, - clientVersion, addr, ugi, conf, - NetUtils.getDefaultSocketFactory(conf), expireIntvl); - } - - @Override - public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo( - ReportNewAggregatorsInfoRequest request) throws YarnException, IOException { - - ReportNewAggregatorsInfoRequestProto requestProto = - ((ReportNewAggregatorsInfoRequestPBImpl) request).getProto(); - try { - return new ReportNewAggregatorsInfoResponsePBImpl( - proxy.reportNewAggregatorInfo(null, requestProto)); - } catch (ServiceException e) { - RPCUtil.unwrapAndThrowException(e); - return null; - } - } - - @Override - public void close() { - if (this.proxy != null) { - RPC.stopProxy(this.proxy); - } - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java new file mode 100644 index 0000000..276a540 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.impl.pb.client; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto; +import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoResponsePBImpl; + +import com.google.protobuf.ServiceException; + +public class CollectorNodemanagerProtocolPBClientImpl implements + CollectorNodemanagerProtocol, Closeable { + + // Not a documented config. Only used for tests internally + static final String NM_COMMAND_TIMEOUT = YarnConfiguration.YARN_PREFIX + + "rpc.nm-command-timeout"; + + /** + * Maximum of 1 minute timeout for a Node to react to the command + */ + static final int DEFAULT_COMMAND_TIMEOUT = 60000; + + private CollectorNodemanagerProtocolPB proxy; + + @Private + public CollectorNodemanagerProtocolPBClientImpl(long clientVersion, + InetSocketAddress addr, Configuration conf) throws IOException { + RPC.setProtocolEngine(conf, CollectorNodemanagerProtocolPB.class, + ProtobufRpcEngine.class); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + int expireIntvl = conf.getInt(NM_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT); + proxy = + (CollectorNodemanagerProtocolPB) RPC.getProxy( + CollectorNodemanagerProtocolPB.class, + clientVersion, addr, ugi, conf, + NetUtils.getDefaultSocketFactory(conf), expireIntvl); + } + + @Override + public ReportNewCollectorInfoResponse reportNewCollectorInfo( + ReportNewCollectorInfoRequest request) throws YarnException, IOException { + + ReportNewCollectorInfoRequestProto requestProto = + ((ReportNewCollectorInfoRequestPBImpl) request).getProto(); + try { + return new ReportNewCollectorInfoResponsePBImpl( + proxy.reportNewCollectorInfo(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public void close() { + if (this.proxy != null) { + RPC.stopProxy(this.proxy); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java deleted file mode 100644 index 87bce16..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.api.impl.pb.service; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoResponseProto; -import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol; -import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB; -import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoRequestPBImpl; -import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoResponsePBImpl; - -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; - -public class AggregatorNodemanagerProtocolPBServiceImpl implements - AggregatorNodemanagerProtocolPB { - - private AggregatorNodemanagerProtocol real; - - public AggregatorNodemanagerProtocolPBServiceImpl(AggregatorNodemanagerProtocol impl) { - this.real = impl; - } - - @Override - public ReportNewAggregatorsInfoResponseProto reportNewAggregatorInfo( - RpcController arg0, ReportNewAggregatorsInfoRequestProto proto) - throws ServiceException { - ReportNewAggregatorsInfoRequestPBImpl request = - new ReportNewAggregatorsInfoRequestPBImpl(proto); - try { - ReportNewAggregatorsInfoResponse response = real.reportNewAggregatorInfo(request); - return ((ReportNewAggregatorsInfoResponsePBImpl)response).getProto(); - } catch (YarnException e) { - throw new ServiceException(e); - } catch (IOException e) { - throw new ServiceException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java new file mode 100644 index 0000000..3f42732 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.impl.pb.service; + +import java.io.IOException; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoResponseProto; +import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoResponsePBImpl; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +public class CollectorNodemanagerProtocolPBServiceImpl implements + CollectorNodemanagerProtocolPB { + + private CollectorNodemanagerProtocol real; + + public CollectorNodemanagerProtocolPBServiceImpl(CollectorNodemanagerProtocol impl) { + this.real = impl; + } + + @Override + public ReportNewCollectorInfoResponseProto reportNewCollectorInfo( + RpcController arg0, ReportNewCollectorInfoRequestProto proto) + throws ServiceException { + ReportNewCollectorInfoRequestPBImpl request = + new ReportNewCollectorInfoRequestPBImpl(proto); + try { + ReportNewCollectorInfoResponse response = real.reportNewCollectorInfo(request); + return ((ReportNewCollectorInfoResponsePBImpl)response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index 09cb9e3..c795e55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -43,11 +43,11 @@ public abstract class NodeHeartbeatRequest { nodeHeartbeatRequest.setNodeLabels(nodeLabels); return nodeHeartbeatRequest; } - + public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, MasterKey lastKnownContainerTokenMasterKey, MasterKey lastKnownNMTokenMasterKey, Set<NodeLabel> nodeLabels, - Map<ApplicationId, String> registeredAggregators) { + Map<ApplicationId, String> registeredCollectors) { NodeHeartbeatRequest nodeHeartbeatRequest = Records.newRecord(NodeHeartbeatRequest.class); nodeHeartbeatRequest.setNodeStatus(nodeStatus); @@ -56,7 +56,7 @@ public abstract class NodeHeartbeatRequest { nodeHeartbeatRequest .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey); nodeHeartbeatRequest.setNodeLabels(nodeLabels); - nodeHeartbeatRequest.setRegisteredAggregators(registeredAggregators); + nodeHeartbeatRequest.setRegisteredCollectors(registeredCollectors); return nodeHeartbeatRequest; } @@ -78,7 +78,8 @@ public abstract class NodeHeartbeatRequest { public abstract void setLogAggregationReportsForApps( List<LogAggregationReport> logAggregationReportsForApps); - // This tells RM registered aggregators' address info on this node - public abstract Map<ApplicationId, String> getRegisteredAggregators(); - public abstract void setRegisteredAggregators(Map<ApplicationId, String> appAggregatorsMap); + // This tells RM registered collectors' address info on this node + public abstract Map<ApplicationId, String> getRegisteredCollectors(); + public abstract void setRegisteredCollectors(Map<ApplicationId, + String> appCollectorsMap); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 7019ad7..8d28d19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -35,10 +35,10 @@ public interface NodeHeartbeatResponse { List<ContainerId> getContainersToBeRemovedFromNM(); List<ApplicationId> getApplicationsToCleanup(); - - // This tells NM the aggregators' address info of related Apps - Map<ApplicationId, String> getAppAggregatorsMap(); - void setAppAggregatorsMap(Map<ApplicationId, String> appAggregatorsMap); + + // This tells NM the collectors' address info of related apps + Map<ApplicationId, String> getAppCollectorsMap(); + void setAppCollectorsMap(Map<ApplicationId, String> appCollectorsMap); void setResponseId(int responseId); void setNodeAction(NodeAction action); http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java deleted file mode 100644 index ae538a2..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.api.protocolrecords; - -import java.util.List; -import java.util.Arrays; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap; -import org.apache.hadoop.yarn.util.Records; - -@Private -public abstract class ReportNewAggregatorsInfoRequest { - - public static ReportNewAggregatorsInfoRequest newInstance( - List<AppAggregatorsMap> appAggregatorsList) { - ReportNewAggregatorsInfoRequest request = - Records.newRecord(ReportNewAggregatorsInfoRequest.class); - request.setAppAggregatorsList(appAggregatorsList); - return request; - } - - public static ReportNewAggregatorsInfoRequest newInstance( - ApplicationId id, String aggregatorAddr) { - ReportNewAggregatorsInfoRequest request = - Records.newRecord(ReportNewAggregatorsInfoRequest.class); - request.setAppAggregatorsList( - Arrays.asList(AppAggregatorsMap.newInstance(id, aggregatorAddr))); - return request; - } - - public abstract List<AppAggregatorsMap> getAppAggregatorsList(); - - public abstract void setAppAggregatorsList( - List<AppAggregatorsMap> appAggregatorsList); - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c7210c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java deleted file mode 100644 index 3b847d6..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.api.protocolrecords; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.yarn.util.Records; - -public abstract class ReportNewAggregatorsInfoResponse { - - @Private - public static ReportNewAggregatorsInfoResponse newInstance() { - ReportNewAggregatorsInfoResponse response = - Records.newRecord(ReportNewAggregatorsInfoResponse.class); - return response; - } - -}