YARN-5355. Backported YARN-2928 into our branch-2 feature branch.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bd32c28b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bd32c28b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bd32c28b Branch: refs/heads/YARN-5355-branch-2 Commit: bd32c28bab266a396007e0bbd30b41085154526b Parents: f30d338 Author: Sangjin Lee <sj...@apache.org> Authored: Thu Jul 14 09:48:26 2016 -0700 Committer: Varun Saxena <varunsax...@apache.org> Committed: Sun Nov 6 21:25:25 2016 +0530 ---------------------------------------------------------------------- .../src/site/markdown/Compatibility.md | 1 + .../jobhistory/JobHistoryEventHandler.java | 323 +- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 18 + .../mapreduce/v2/app/job/impl/JobImpl.java | 2 +- .../v2/app/rm/RMContainerAllocator.java | 10 + .../hadoop/mapreduce/jobhistory/TestEvents.java | 18 +- .../jobhistory/TestJobHistoryEventHandler.java | 38 +- .../v2/app/rm/TestRMContainerAllocator.java | 3 +- .../mapreduce/jobhistory/AMStartedEvent.java | 28 +- .../mapreduce/jobhistory/HistoryEvent.java | 18 + .../mapreduce/jobhistory/JobFinishedEvent.java | 33 + .../jobhistory/JobInfoChangeEvent.java | 21 +- .../mapreduce/jobhistory/JobInitedEvent.java | 25 +- .../jobhistory/JobPriorityChangeEvent.java | 21 +- .../jobhistory/JobQueueChangeEvent.java | 18 + .../jobhistory/JobStatusChangedEvent.java | 21 +- .../mapreduce/jobhistory/JobSubmittedEvent.java | 67 +- .../JobUnsuccessfulCompletionEvent.java | 27 +- .../jobhistory/MapAttemptFinishedEvent.java | 32 +- .../jobhistory/NormalizedResourceEvent.java | 19 + .../jobhistory/ReduceAttemptFinishedEvent.java | 33 +- .../jobhistory/TaskAttemptFinishedEvent.java | 26 + .../jobhistory/TaskAttemptStartedEvent.java | 29 +- .../TaskAttemptUnsuccessfulCompletionEvent.java | 37 +- .../mapreduce/jobhistory/TaskFailedEvent.java | 29 +- .../mapreduce/jobhistory/TaskFinishedEvent.java | 27 +- .../mapreduce/jobhistory/TaskStartedEvent.java | 20 + .../mapreduce/jobhistory/TaskUpdatedEvent.java | 21 +- .../mapreduce/util/JobHistoryEventUtils.java | 83 + .../hadoop-mapreduce-client-jobclient/pom.xml | 6 + .../mapred/TestMRTimelineEventHandling.java | 313 +- .../org/apache/hadoop/mapred/UtilsForTests.java | 42 +- .../apache/hadoop/mapreduce/EntityWriterV2.java | 57 + .../hadoop/mapreduce/JobHistoryFileParser.java | 3 + .../mapreduce/JobHistoryFileReplayMapperV1.java | 14 +- .../mapreduce/JobHistoryFileReplayMapperV2.java | 161 + .../mapreduce/SimpleEntityWriterConstants.java | 43 + .../hadoop/mapreduce/SimpleEntityWriterV1.java | 32 +- .../hadoop/mapreduce/SimpleEntityWriterV2.java | 131 + .../mapreduce/TimelineEntityConverterV1.java | 15 +- .../mapreduce/TimelineEntityConverterV2.java | 206 + .../mapreduce/TimelineServicePerformance.java | 130 +- .../hadoop/mapreduce/v2/MiniMRYarnCluster.java | 22 +- .../apache/hadoop/test/MapredTestDriver.java | 32 +- hadoop-project/pom.xml | 128 + hadoop-project/src/site/site.xml | 2 + hadoop-yarn-project/hadoop-yarn/bin/yarn | 3 + hadoop-yarn-project/hadoop-yarn/bin/yarn.cmd | 8 +- .../dev-support/findbugs-exclude.xml | 27 + .../hadoop-yarn/hadoop-yarn-api/pom.xml | 4 + .../api/protocolrecords/AllocateResponse.java | 14 + .../api/records/timeline/TimelineEntity.java | 21 +- .../api/records/timeline/TimelineEvent.java | 8 +- .../ApplicationAttemptEntity.java | 41 + .../timelineservice/ApplicationEntity.java | 52 + .../records/timelineservice/ClusterEntity.java | 40 + .../timelineservice/ContainerEntity.java | 41 + .../timelineservice/FlowActivityEntity.java | 191 + .../records/timelineservice/FlowRunEntity.java | 126 + .../HierarchicalTimelineEntity.java | 133 + .../records/timelineservice/QueueEntity.java | 40 + .../timelineservice/TimelineEntities.java | 62 + .../records/timelineservice/TimelineEntity.java | 584 +++ .../timelineservice/TimelineEntityType.java | 101 + .../records/timelineservice/TimelineEvent.java | 133 + .../records/timelineservice/TimelineMetric.java | 289 ++ .../TimelineMetricCalculator.java | 115 + .../TimelineMetricOperation.java | 167 + .../timelineservice/TimelineWriteResponse.java | 167 + .../api/records/timelineservice/UserEntity.java | 40 + .../records/timelineservice/package-info.java | 26 + .../hadoop/yarn/conf/YarnConfiguration.java | 157 +- .../hadoop/yarn/util/TimelineServiceHelper.java | 49 + .../src/main/proto/yarn_service_protos.proto | 1 + .../timelineservice/TestTimelineMetric.java | 100 + .../yarn/conf/TestYarnConfigurationFields.java | 3 + .../pom.xml | 10 + .../distributedshell/ApplicationMaster.java | 177 +- .../applications/distributedshell/Client.java | 45 +- .../distributedshell/TestDistributedShell.java | 298 +- .../TestDistributedShellWithNodeLabels.java | 2 +- .../hadoop/yarn/client/api/AMRMClient.java | 20 +- .../yarn/client/api/async/AMRMClientAsync.java | 19 +- .../api/async/impl/AMRMClientAsyncImpl.java | 17 +- .../hadoop-yarn/hadoop-yarn-common/pom.xml | 1 + .../impl/pb/AllocateResponsePBImpl.java | 17 + .../hadoop/yarn/client/api/TimelineClient.java | 87 +- .../client/api/impl/TimelineClientImpl.java | 486 ++- .../yarn/util/timeline/TimelineUtils.java | 49 + .../hadoop/yarn/webapp/util/WebAppUtils.java | 8 +- .../src/main/resources/webapps/timeline/.keep | 0 .../src/main/resources/yarn-default.xml | 87 +- .../hadoop/yarn/TestContainerLaunchRPC.java | 16 +- .../yarn/TestContainerResourceIncreaseRPC.java | 16 +- .../java/org/apache/hadoop/yarn/TestRPC.java | 303 -- .../TestTimelineServiceRecords.java | 312 ++ .../api/impl/TestTimelineClientV2Impl.java | 378 ++ .../yarn/util/TestTimelineServiceHelper.java | 83 + .../hadoop-yarn-server-common/pom.xml | 1 + .../api/CollectorNodemanagerProtocol.java | 73 + .../api/CollectorNodemanagerProtocolPB.java | 34 + ...ollectorNodemanagerProtocolPBClientImpl.java | 114 + ...llectorNodemanagerProtocolPBServiceImpl.java | 82 + .../GetTimelineCollectorContextRequest.java | 37 + .../GetTimelineCollectorContextResponse.java | 51 + .../protocolrecords/NodeHeartbeatRequest.java | 23 + .../protocolrecords/NodeHeartbeatResponse.java | 4 + .../ReportNewCollectorInfoRequest.java | 53 + .../ReportNewCollectorInfoResponse.java | 32 + ...etTimelineCollectorContextRequestPBImpl.java | 132 + ...tTimelineCollectorContextResponsePBImpl.java | 159 + .../impl/pb/NodeHeartbeatRequestPBImpl.java | 62 + .../impl/pb/NodeHeartbeatResponsePBImpl.java | 47 + .../pb/ReportNewCollectorInfoRequestPBImpl.java | 144 + .../ReportNewCollectorInfoResponsePBImpl.java | 76 + .../server/api/records/AppCollectorsMap.java | 46 + .../records/impl/pb/AppCollectorsMapPBImpl.java | 152 + .../metrics/ContainerMetricsConstants.java | 18 + .../proto/collectornodemanager_protocol.proto | 30 + .../yarn_server_common_service_protos.proto | 31 + .../java/org/apache/hadoop/yarn/TestRPC.java | 454 +++ .../hadoop/yarn/TestYarnServerApiClasses.java | 17 + .../hadoop/yarn/server/nodemanager/Context.java | 12 + .../yarn/server/nodemanager/NodeManager.java | 57 + .../nodemanager/NodeStatusUpdaterImpl.java | 47 +- .../collectormanager/NMCollectorService.java | 145 + .../collectormanager/package-info.java | 28 + .../containermanager/ContainerManagerImpl.java | 110 +- .../application/Application.java | 5 + .../ApplicationContainerFinishedEvent.java | 18 +- .../application/ApplicationImpl.java | 91 +- .../containermanager/container/Container.java | 4 + .../container/ContainerImpl.java | 21 +- .../monitor/ContainersMonitorImpl.java | 33 +- .../timelineservice/NMTimelineEvent.java | 35 + .../timelineservice/NMTimelineEventType.java | 27 + .../timelineservice/NMTimelinePublisher.java | 405 ++ .../timelineservice/package-info.java | 29 + .../nodemanager/TestNodeStatusUpdater.java | 1 + .../amrmproxy/BaseAMRMProxyTest.java | 14 + .../TestContainerManagerRecovery.java | 11 +- .../application/TestApplication.java | 13 +- .../launcher/TestContainerLaunch.java | 1 - .../TestAppLogAggregatorImpl.java | 2 +- .../TestNMTimelinePublisher.java | 157 + .../yarn/server/nodemanager/webapp/MockApp.java | 22 + .../nodemanager/webapp/MockContainer.java | 4 + .../webapp/TestContainerLogsPage.java | 6 +- .../nodemanager/webapp/TestNMWebServer.java | 8 +- .../hadoop-yarn-server-resourcemanager/pom.xml | 10 + .../ApplicationMasterService.java | 11 + .../server/resourcemanager/ClientRMService.java | 24 + .../resourcemanager/RMActiveServiceContext.java | 20 +- .../server/resourcemanager/RMAppManager.java | 28 +- .../yarn/server/resourcemanager/RMContext.java | 6 + .../server/resourcemanager/RMContextImpl.java | 19 +- .../server/resourcemanager/ResourceManager.java | 63 +- .../resourcemanager/ResourceTrackerService.java | 68 + .../resourcemanager/amlauncher/AMLauncher.java | 58 +- .../metrics/AbstractSystemMetricsPublisher.java | 178 + .../metrics/AppAttemptFinishedEvent.java | 90 - .../metrics/AppAttemptRegisteredEvent.java | 81 - .../metrics/ApplicaitonStateUpdatedEvent.java | 47 - .../metrics/ApplicationACLsUpdatedEvent.java | 45 - .../metrics/ApplicationCreatedEvent.java | 132 - .../metrics/ApplicationFinishedEvent.java | 82 - .../metrics/ApplicationUpdatedEvent.java | 54 - .../metrics/ContainerCreatedEvent.java | 73 - .../metrics/ContainerFinishedEvent.java | 72 - .../metrics/NoOpSystemMetricPublisher.java | 70 + .../metrics/SystemMetricsEvent.java | 33 - .../metrics/SystemMetricsEventType.java | 37 - .../metrics/SystemMetricsPublisher.java | 615 +-- .../metrics/TimelineServiceV1Publisher.java | 386 ++ .../metrics/TimelineServiceV2Publisher.java | 449 +++ .../resourcemanager/metrics/package-info.java | 28 + .../server/resourcemanager/rmapp/RMApp.java | 23 + .../rmapp/RMAppCollectorUpdateEvent.java | 40 + .../resourcemanager/rmapp/RMAppEventType.java | 3 + .../server/resourcemanager/rmapp/RMAppImpl.java | 93 +- .../RMTimelineCollectorManager.java | 104 + .../timelineservice/package-info.java | 28 + .../server/resourcemanager/TestAppManager.java | 5 + .../resourcemanager/TestClientRMService.java | 15 +- .../server/resourcemanager/TestRMRestart.java | 63 + .../TestResourceTrackerService.java | 85 + .../applicationsmanager/MockAsm.java | 12 + .../TestRMAppLogAggregationStatus.java | 6 +- .../metrics/TestSystemMetricsPublisher.java | 6 +- .../TestSystemMetricsPublisherForV2.java | 398 ++ .../server/resourcemanager/rmapp/MockRMApp.java | 14 + .../rmapp/TestRMAppTransitions.java | 4 + .../hadoop-yarn-server-tests/pom.xml | 17 + .../hadoop/yarn/server/MiniYARNCluster.java | 7 +- .../hadoop/yarn/server/TestMiniYarnCluster.java | 2 - .../TestTimelineServiceClientIntegration.java | 180 + .../pom.xml | 401 ++ ...stTimelineReaderWebServicesHBaseStorage.java | 2165 ++++++++++ .../storage/TestHBaseTimelineStorage.java | 3751 ++++++++++++++++++ ...TestPhoenixOfflineAggregationWriterImpl.java | 161 + .../storage/flow/TestFlowDataGenerator.java | 386 ++ .../flow/TestHBaseStorageFlowActivity.java | 483 +++ .../storage/flow/TestHBaseStorageFlowRun.java | 1034 +++++ .../flow/TestHBaseStorageFlowRunCompaction.java | 831 ++++ .../src/test/resources/log4j.properties | 19 + .../hadoop-yarn-server-timelineservice/pom.xml | 203 + .../server/timelineservice/TimelineContext.java | 146 + .../collector/AppLevelTimelineCollector.java | 161 + .../collector/NodeTimelineCollectorManager.java | 223 ++ .../PerNodeTimelineCollectorsAuxService.java | 231 ++ .../collector/TimelineCollector.java | 341 ++ .../collector/TimelineCollectorContext.java | 76 + .../collector/TimelineCollectorManager.java | 254 ++ .../collector/TimelineCollectorWebService.java | 250 ++ .../timelineservice/collector/package-info.java | 29 + .../server/timelineservice/package-info.java | 28 + .../reader/TimelineDataToRetrieve.java | 147 + .../reader/TimelineEntityFilters.java | 242 ++ .../reader/TimelineParseConstants.java | 34 + .../reader/TimelineParseException.java | 36 + .../timelineservice/reader/TimelineParser.java | 37 + .../reader/TimelineParserForCompareExpr.java | 300 ++ .../reader/TimelineParserForDataToRetrieve.java | 95 + .../reader/TimelineParserForEqualityExpr.java | 343 ++ .../reader/TimelineParserForExistFilters.java | 51 + .../reader/TimelineParserForKVFilters.java | 78 + .../reader/TimelineParserForNumericFilters.java | 72 + .../TimelineParserForRelationFilters.java | 71 + .../reader/TimelineReaderContext.java | 98 + .../reader/TimelineReaderManager.java | 179 + .../reader/TimelineReaderServer.java | 178 + .../reader/TimelineReaderUtils.java | 171 + .../reader/TimelineReaderWebServices.java | 2123 ++++++++++ .../reader/TimelineReaderWebServicesUtils.java | 247 ++ .../reader/TimelineUIDConverter.java | 247 ++ .../reader/filter/TimelineCompareFilter.java | 147 + .../reader/filter/TimelineCompareOp.java | 36 + .../reader/filter/TimelineExistsFilter.java | 107 + .../reader/filter/TimelineFilter.java | 68 + .../reader/filter/TimelineFilterList.java | 141 + .../reader/filter/TimelineFilterUtils.java | 290 ++ .../reader/filter/TimelineKeyValueFilter.java | 61 + .../reader/filter/TimelineKeyValuesFilter.java | 126 + .../reader/filter/TimelinePrefixFilter.java | 99 + .../reader/filter/package-info.java | 28 + .../timelineservice/reader/package-info.java | 29 + .../storage/FileSystemTimelineReaderImpl.java | 408 ++ .../storage/FileSystemTimelineWriterImpl.java | 158 + .../storage/HBaseTimelineReaderImpl.java | 88 + .../storage/HBaseTimelineWriterImpl.java | 571 +++ .../storage/OfflineAggregationWriter.java | 67 + .../PhoenixOfflineAggregationWriterImpl.java | 358 ++ .../storage/TimelineAggregationTrack.java | 28 + .../timelineservice/storage/TimelineReader.java | 180 + .../storage/TimelineSchemaCreator.java | 272 ++ .../timelineservice/storage/TimelineWriter.java | 87 + .../storage/application/ApplicationColumn.java | 156 + .../application/ApplicationColumnFamily.java | 65 + .../application/ApplicationColumnPrefix.java | 288 ++ .../storage/application/ApplicationRowKey.java | 206 + .../application/ApplicationRowKeyPrefix.java | 69 + .../storage/application/ApplicationTable.java | 161 + .../storage/application/package-info.java | 28 + .../storage/apptoflow/AppToFlowColumn.java | 148 + .../apptoflow/AppToFlowColumnFamily.java | 51 + .../storage/apptoflow/AppToFlowRowKey.java | 143 + .../storage/apptoflow/AppToFlowTable.java | 113 + .../storage/apptoflow/package-info.java | 28 + .../storage/common/AppIdKeyConverter.java | 95 + .../storage/common/BaseTable.java | 140 + .../common/BufferedMutatorDelegator.java | 73 + .../timelineservice/storage/common/Column.java | 80 + .../storage/common/ColumnFamily.java | 34 + .../storage/common/ColumnHelper.java | 388 ++ .../storage/common/ColumnPrefix.java | 145 + .../storage/common/EventColumnName.java | 63 + .../common/EventColumnNameConverter.java | 99 + .../storage/common/GenericConverter.java | 48 + .../storage/common/KeyConverter.java | 41 + .../storage/common/LongConverter.java | 94 + .../storage/common/LongKeyConverter.java | 68 + .../storage/common/NumericValueConverter.java | 39 + .../storage/common/OfflineAggregationInfo.java | 115 + .../timelineservice/storage/common/Range.java | 62 + .../storage/common/RowKeyPrefix.java | 42 + .../storage/common/Separator.java | 575 +++ .../storage/common/StringKeyConverter.java | 54 + .../common/TimelineEntityFiltersType.java | 71 + .../common/TimelineHBaseSchemaConstants.java | 71 + .../storage/common/TimelineStorageUtils.java | 586 +++ .../storage/common/TimestampGenerator.java | 116 + .../storage/common/TypedBufferedMutator.java | 28 + .../storage/common/ValueConverter.java | 47 + .../storage/common/package-info.java | 28 + .../storage/entity/EntityColumn.java | 160 + .../storage/entity/EntityColumnFamily.java | 65 + .../storage/entity/EntityColumnPrefix.java | 300 ++ .../storage/entity/EntityRowKey.java | 225 ++ .../storage/entity/EntityRowKeyPrefix.java | 74 + .../storage/entity/EntityTable.java | 161 + .../storage/entity/package-info.java | 28 + .../flow/AggregationCompactionDimension.java | 63 + .../storage/flow/AggregationOperation.java | 94 + .../timelineservice/storage/flow/Attribute.java | 39 + .../storage/flow/FlowActivityColumnFamily.java | 55 + .../storage/flow/FlowActivityColumnPrefix.java | 277 ++ .../storage/flow/FlowActivityRowKey.java | 196 + .../storage/flow/FlowActivityRowKeyPrefix.java | 60 + .../storage/flow/FlowActivityTable.java | 108 + .../storage/flow/FlowRunColumn.java | 182 + .../storage/flow/FlowRunColumnFamily.java | 54 + .../storage/flow/FlowRunColumnPrefix.java | 268 ++ .../storage/flow/FlowRunCoprocessor.java | 304 ++ .../storage/flow/FlowRunRowKey.java | 190 + .../storage/flow/FlowRunRowKeyPrefix.java | 54 + .../storage/flow/FlowRunTable.java | 141 + .../storage/flow/FlowScanner.java | 727 ++++ .../storage/flow/FlowScannerOperation.java | 46 + .../storage/flow/package-info.java | 29 + .../timelineservice/storage/package-info.java | 28 + .../storage/reader/ApplicationEntityReader.java | 481 +++ .../reader/FlowActivityEntityReader.java | 163 + .../storage/reader/FlowRunEntityReader.java | 269 ++ .../storage/reader/GenericEntityReader.java | 648 +++ .../storage/reader/TimelineEntityReader.java | 496 +++ .../reader/TimelineEntityReaderFactory.java | 89 + .../storage/reader/package-info.java | 28 + .../TestAppLevelTimelineCollector.java | 23 + .../TestNMTimelineCollectorManager.java | 170 + ...TestPerNodeTimelineCollectorsAuxService.java | 203 + .../collector/TestTimelineCollector.java | 127 + .../reader/TestTimelineReaderServer.java | 57 + .../reader/TestTimelineReaderUtils.java | 55 + .../reader/TestTimelineReaderWebServices.java | 561 +++ .../TestTimelineReaderWebServicesUtils.java | 923 +++++ .../reader/TestTimelineUIDConverter.java | 97 + .../TestFileSystemTimelineReaderImpl.java | 804 ++++ .../TestFileSystemTimelineWriterImpl.java | 129 + .../storage/common/TestKeyConverters.java | 130 + .../storage/common/TestRowKeys.java | 246 ++ .../storage/common/TestSeparator.java | 215 + .../src/test/resources/log4j.properties | 19 + .../hadoop-yarn/hadoop-yarn-server/pom.xml | 2 + .../src/site/markdown/TimelineServer.md | 2 +- .../src/site/markdown/TimelineServiceV2.md | 1199 ++++++ .../site/resources/images/flow_hierarchy.png | Bin 0 -> 42345 bytes .../src/site/resources/images/timeline_v2.jpg | Bin 0 -> 45112 bytes 347 files changed, 47545 insertions(+), 2069 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-common-project/hadoop-common/src/site/markdown/Compatibility.md ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Compatibility.md b/hadoop-common-project/hadoop-common/src/site/markdown/Compatibility.md index a7ded24..05b18b5 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Compatibility.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Compatibility.md @@ -128,6 +128,7 @@ REST API compatibility corresponds to both the requests (URLs) and responses to * [MR Application Master](../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapredAppMasterRest.html) * [History Server](../../hadoop-mapreduce-client/hadoop-mapreduce-client-hs/HistoryServerRest.html) * [Timeline Server v1 REST API](../../hadoop-yarn/hadoop-yarn-site/TimelineServer.html) +* [Timeline Service v2 REST API](../../hadoop-yarn/hadoop-yarn-site/TimelineServiceV2.html) #### Policy http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 1e258ac..deb2aab 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -25,10 +25,12 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -44,16 +46,17 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.CounterGroup; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.util.MRJobConfUtil; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.MRAppMaster; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; @@ -66,16 +69,15 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.node.ArrayNode; import org.codehaus.jackson.node.JsonNodeFactory; -import org.codehaus.jackson.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.sun.jersey.api.client.ClientHandlerException; @@ -133,8 +135,12 @@ public class JobHistoryEventHandler extends AbstractService protected TimelineClient timelineClient; + private boolean timelineServiceV2Enabled = false; + private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB"; private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK"; + private static final String MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE = + "MAPREDUCE_TASK_ATTEMPT"; public JobHistoryEventHandler(AppContext context, int startCount) { super("JobHistoryEventHandler"); @@ -254,19 +260,28 @@ public class JobHistoryEventHandler extends AbstractService MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD); + // TODO replace MR specific configurations on timeline service with getting + // configuration from RM through registerApplicationMaster() in + // ApplicationMasterProtocol with return value for timeline service + // configuration status: off, on_with_v1 or on_with_v2. if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) { - if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { - timelineClient = TimelineClient.createTimelineClient(); + LOG.info("Emitting job history data to the timeline service is enabled"); + if (YarnConfiguration.timelineServiceEnabled(conf)) { + + timelineClient = + ((MRAppMaster.RunningAppContext)context).getTimelineClient(); timelineClient.init(conf); - LOG.info("Timeline service is enabled"); - LOG.info("Emitting job history data to the timeline server is enabled"); + timelineServiceV2Enabled = + YarnConfiguration.timelineServiceV2Enabled(conf); + LOG.info("Timeline service is enabled; version: " + + YarnConfiguration.getTimelineServiceVersion(conf)); } else { LOG.info("Timeline service is not enabled"); } } else { - LOG.info("Emitting job history data to the timeline server is not enabled"); + LOG.info("Emitting job history data to the timeline server is not " + + "enabled"); } // Flag for setting @@ -591,8 +606,13 @@ public class JobHistoryEventHandler extends AbstractService processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), event.getJobID()); if (timelineClient != null) { - processEventForTimelineServer(historyEvent, event.getJobID(), - event.getTimestamp()); + if (timelineServiceV2Enabled) { + processEventForNewTimelineService(historyEvent, event.getJobID(), + event.getTimestamp()); + } else { + processEventForTimelineServer(historyEvent, event.getJobID(), + event.getTimestamp()); + } } if (LOG.isDebugEnabled()) { LOG.debug("In HistoryEventHandler " @@ -834,11 +854,11 @@ public class JobHistoryEventHandler extends AbstractService tEvent.addEventInfo("FINISHED_MAPS", jfe.getFinishedMaps()); tEvent.addEventInfo("FINISHED_REDUCES", jfe.getFinishedReduces()); tEvent.addEventInfo("MAP_COUNTERS_GROUPS", - countersToJSON(jfe.getMapCounters())); + JobHistoryEventUtils.countersToJSON(jfe.getMapCounters())); tEvent.addEventInfo("REDUCE_COUNTERS_GROUPS", - countersToJSON(jfe.getReduceCounters())); + JobHistoryEventUtils.countersToJSON(jfe.getReduceCounters())); tEvent.addEventInfo("TOTAL_COUNTERS_GROUPS", - countersToJSON(jfe.getTotalCounters())); + JobHistoryEventUtils.countersToJSON(jfe.getTotalCounters())); tEvent.addEventInfo("JOB_STATUS", JobState.SUCCEEDED.toString()); tEntity.addEvent(tEvent); tEntity.setEntityId(jobId.toString()); @@ -864,7 +884,7 @@ public class JobHistoryEventHandler extends AbstractService tfe.getFailedAttemptID() == null ? "" : tfe.getFailedAttemptID().toString()); tEvent.addEventInfo("COUNTERS_GROUPS", - countersToJSON(tfe.getCounters())); + JobHistoryEventUtils.countersToJSON(tfe.getCounters())); tEntity.addEvent(tEvent); tEntity.setEntityId(tfe.getTaskId().toString()); tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE); @@ -882,7 +902,7 @@ public class JobHistoryEventHandler extends AbstractService TaskFinishedEvent tfe2 = (TaskFinishedEvent) event; tEvent.addEventInfo("TASK_TYPE", tfe2.getTaskType().toString()); tEvent.addEventInfo("COUNTERS_GROUPS", - countersToJSON(tfe2.getCounters())); + JobHistoryEventUtils.countersToJSON(tfe2.getCounters())); tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime()); tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString()); tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID", @@ -904,7 +924,6 @@ public class JobHistoryEventHandler extends AbstractService tEvent.addEventInfo("START_TIME", tase.getStartTime()); tEvent.addEventInfo("HTTP_PORT", tase.getHttpPort()); tEvent.addEventInfo("TRACKER_NAME", tase.getTrackerName()); - tEvent.addEventInfo("TASK_TYPE", tase.getTaskType().toString()); tEvent.addEventInfo("SHUFFLE_PORT", tase.getShufflePort()); tEvent.addEventInfo("CONTAINER_ID", tase.getContainerId() == null ? "" : tase.getContainerId().toString()); @@ -937,7 +956,7 @@ public class JobHistoryEventHandler extends AbstractService tEvent.addEventInfo("SORT_FINISH_TIME", tauce.getFinishTime()); tEvent.addEventInfo("MAP_FINISH_TIME", tauce.getFinishTime()); tEvent.addEventInfo("COUNTERS_GROUPS", - countersToJSON(tauce.getCounters())); + JobHistoryEventUtils.countersToJSON(tauce.getCounters())); tEntity.addEvent(tEvent); tEntity.setEntityId(tauce.getTaskId().toString()); tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE); @@ -951,7 +970,7 @@ public class JobHistoryEventHandler extends AbstractService tEvent.addEventInfo("STATE", mafe.getState()); tEvent.addEventInfo("MAP_FINISH_TIME", mafe.getMapFinishTime()); tEvent.addEventInfo("COUNTERS_GROUPS", - countersToJSON(mafe.getCounters())); + JobHistoryEventUtils.countersToJSON(mafe.getCounters())); tEvent.addEventInfo("HOSTNAME", mafe.getHostname()); tEvent.addEventInfo("PORT", mafe.getPort()); tEvent.addEventInfo("RACK_NAME", mafe.getRackName()); @@ -973,7 +992,7 @@ public class JobHistoryEventHandler extends AbstractService tEvent.addEventInfo("SHUFFLE_FINISH_TIME", rafe.getShuffleFinishTime()); tEvent.addEventInfo("SORT_FINISH_TIME", rafe.getSortFinishTime()); tEvent.addEventInfo("COUNTERS_GROUPS", - countersToJSON(rafe.getCounters())); + JobHistoryEventUtils.countersToJSON(rafe.getCounters())); tEvent.addEventInfo("HOSTNAME", rafe.getHostname()); tEvent.addEventInfo("PORT", rafe.getPort()); tEvent.addEventInfo("RACK_NAME", rafe.getRackName()); @@ -992,7 +1011,7 @@ public class JobHistoryEventHandler extends AbstractService tEvent.addEventInfo("STATUS", tafe.getTaskStatus()); tEvent.addEventInfo("STATE", tafe.getState()); tEvent.addEventInfo("COUNTERS_GROUPS", - countersToJSON(tafe.getCounters())); + JobHistoryEventUtils.countersToJSON(tafe.getCounters())); tEvent.addEventInfo("HOSTNAME", tafe.getHostname()); tEntity.addEvent(tEvent); tEntity.setEntityId(tafe.getTaskId().toString()); @@ -1042,24 +1061,254 @@ public class JobHistoryEventHandler extends AbstractService } } - @Private - public JsonNode countersToJSON(Counters counters) { - ArrayNode nodes = FACTORY.arrayNode(); - if (counters != null) { - for (CounterGroup counterGroup : counters) { - ObjectNode groupNode = nodes.addObject(); - groupNode.put("NAME", counterGroup.getName()); - groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName()); - ArrayNode countersNode = groupNode.putArray("COUNTERS"); - for (Counter counter : counterGroup) { - ObjectNode counterNode = countersNode.addObject(); - counterNode.put("NAME", counter.getName()); - counterNode.put("DISPLAY_NAME", counter.getDisplayName()); - counterNode.put("VALUE", counter.getValue()); + // create JobEntity from HistoryEvent with adding other info, like: + // jobId, timestamp and entityType. + private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + createJobEntity(HistoryEvent event, long timestamp, JobId jobId, + String entityType, boolean setCreatedTime) { + + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + createBaseEntity(event, timestamp, entityType, setCreatedTime); + entity.setId(jobId.toString()); + return entity; + } + + private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + createJobEntity(JobId jobId) { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); + entity.setId(jobId.toString()); + entity.setType(MAPREDUCE_JOB_ENTITY_TYPE); + return entity; + } + + // create ApplicationEntity with job finished Metrics from HistoryEvent + private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + createAppEntityWithJobMetrics(HistoryEvent event, JobId jobId) { + ApplicationEntity entity = new ApplicationEntity(); + entity.setId(jobId.getAppId().toString()); + entity.setMetrics(event.getTimelineMetrics()); + return entity; + } + + // create BaseEntity from HistoryEvent with adding other info, like: + // timestamp and entityType. + private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + createBaseEntity(HistoryEvent event, long timestamp, String entityType, + boolean setCreatedTime) { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent tEvent = + event.toTimelineEvent(); + tEvent.setTimestamp(timestamp); + + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); + entity.addEvent(tEvent); + entity.setType(entityType); + if (setCreatedTime) { + entity.setCreatedTime(timestamp); + } + Set<TimelineMetric> timelineMetrics = event.getTimelineMetrics(); + if (timelineMetrics != null) { + entity.setMetrics(timelineMetrics); + } + return entity; + } + + // create TaskEntity from HistoryEvent with adding other info, like: + // taskId, jobId, timestamp, entityType and relatedJobEntity. + private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + createTaskEntity(HistoryEvent event, long timestamp, String taskId, + String entityType, String relatedJobEntity, JobId jobId, + boolean setCreatedTime) { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + createBaseEntity(event, timestamp, entityType, setCreatedTime); + entity.setId(taskId); + if (event.getEventType() == EventType.TASK_STARTED) { + entity.addInfo("TASK_TYPE", + ((TaskStartedEvent)event).getTaskType().toString()); + } + entity.addIsRelatedToEntity(relatedJobEntity, jobId.toString()); + return entity; + } + + // create TaskAttemptEntity from HistoryEvent with adding other info, like: + // timestamp, taskAttemptId, entityType, relatedTaskEntity and taskId. + private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + createTaskAttemptEntity(HistoryEvent event, long timestamp, + String taskAttemptId, String entityType, String relatedTaskEntity, + String taskId, boolean setCreatedTime) { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + createBaseEntity(event, timestamp, entityType, setCreatedTime); + entity.setId(taskAttemptId); + entity.addIsRelatedToEntity(relatedTaskEntity, taskId); + return entity; + } + + private void publishConfigsOnJobSubmittedEvent(JobSubmittedEvent event, + JobId jobId) { + if (event.getJobConf() == null) { + return; + } + // Publish job configurations both as job and app entity. + // Configs are split into multiple entities if they exceed 100kb in size. + org.apache.hadoop.yarn.api.records.timelineservice. + TimelineEntity jobEntityForConfigs = createJobEntity(jobId); + ApplicationEntity appEntityForConfigs = new ApplicationEntity(); + String appId = jobId.getAppId().toString(); + appEntityForConfigs.setId(appId); + try { + int configSize = 0; + for (Map.Entry<String, String> entry : event.getJobConf()) { + int size = entry.getKey().length() + entry.getValue().length(); + configSize += size; + if (configSize > JobHistoryEventUtils.ATS_CONFIG_PUBLISH_SIZE_BYTES) { + if (jobEntityForConfigs.getConfigs().size() > 0) { + timelineClient.putEntities(jobEntityForConfigs); + timelineClient.putEntities(appEntityForConfigs); + jobEntityForConfigs = createJobEntity(jobId); + appEntityForConfigs = new ApplicationEntity(); + appEntityForConfigs.setId(appId); + } + configSize = size; } + jobEntityForConfigs.addConfig(entry.getKey(), entry.getValue()); + appEntityForConfigs.addConfig(entry.getKey(), entry.getValue()); + } + if (configSize > 0) { + timelineClient.putEntities(jobEntityForConfigs); + timelineClient.putEntities(appEntityForConfigs); } + } catch (IOException | YarnException e) { + LOG.error("Exception while publishing configs on JOB_SUBMITTED Event " + + " for the job : " + jobId, e); + } + } + + private void processEventForNewTimelineService(HistoryEvent event, + JobId jobId, long timestamp) { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity tEntity = + null; + String taskId = null; + String taskAttemptId = null; + boolean setCreatedTime = false; + + switch (event.getEventType()) { + // Handle job events + case JOB_SUBMITTED: + setCreatedTime = true; + break; + case JOB_STATUS_CHANGED: + case JOB_INFO_CHANGED: + case JOB_INITED: + case JOB_PRIORITY_CHANGED: + case JOB_QUEUE_CHANGED: + case JOB_FAILED: + case JOB_KILLED: + case JOB_ERROR: + case JOB_FINISHED: + case AM_STARTED: + case NORMALIZED_RESOURCE: + break; + // Handle task events + case TASK_STARTED: + setCreatedTime = true; + taskId = ((TaskStartedEvent)event).getTaskId().toString(); + break; + case TASK_FAILED: + taskId = ((TaskFailedEvent)event).getTaskId().toString(); + break; + case TASK_UPDATED: + taskId = ((TaskUpdatedEvent)event).getTaskId().toString(); + break; + case TASK_FINISHED: + taskId = ((TaskFinishedEvent)event).getTaskId().toString(); + break; + case MAP_ATTEMPT_STARTED: + case REDUCE_ATTEMPT_STARTED: + setCreatedTime = true; + taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString(); + taskAttemptId = ((TaskAttemptStartedEvent)event). + getTaskAttemptId().toString(); + break; + case CLEANUP_ATTEMPT_STARTED: + case SETUP_ATTEMPT_STARTED: + taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString(); + taskAttemptId = ((TaskAttemptStartedEvent)event). + getTaskAttemptId().toString(); + break; + case MAP_ATTEMPT_FAILED: + case CLEANUP_ATTEMPT_FAILED: + case REDUCE_ATTEMPT_FAILED: + case SETUP_ATTEMPT_FAILED: + case MAP_ATTEMPT_KILLED: + case CLEANUP_ATTEMPT_KILLED: + case REDUCE_ATTEMPT_KILLED: + case SETUP_ATTEMPT_KILLED: + taskId = ((TaskAttemptUnsuccessfulCompletionEvent)event). + getTaskId().toString(); + taskAttemptId = ((TaskAttemptUnsuccessfulCompletionEvent)event). + getTaskAttemptId().toString(); + break; + case MAP_ATTEMPT_FINISHED: + taskId = ((MapAttemptFinishedEvent)event).getTaskId().toString(); + taskAttemptId = ((MapAttemptFinishedEvent)event). + getAttemptId().toString(); + break; + case REDUCE_ATTEMPT_FINISHED: + taskId = ((ReduceAttemptFinishedEvent)event).getTaskId().toString(); + taskAttemptId = ((ReduceAttemptFinishedEvent)event). + getAttemptId().toString(); + break; + case SETUP_ATTEMPT_FINISHED: + case CLEANUP_ATTEMPT_FINISHED: + taskId = ((TaskAttemptFinishedEvent)event).getTaskId().toString(); + taskAttemptId = ((TaskAttemptFinishedEvent)event). + getAttemptId().toString(); + break; + default: + LOG.warn("EventType: " + event.getEventType() + " cannot be recognized" + + " and handled by timeline service."); + return; + } + + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + appEntityWithJobMetrics = null; + if (taskId == null) { + // JobEntity + tEntity = createJobEntity(event, timestamp, jobId, + MAPREDUCE_JOB_ENTITY_TYPE, setCreatedTime); + if (event.getEventType() == EventType.JOB_FINISHED + && event.getTimelineMetrics() != null) { + appEntityWithJobMetrics = createAppEntityWithJobMetrics(event, jobId); + } + } else { + if (taskAttemptId == null) { + // TaskEntity + tEntity = createTaskEntity(event, timestamp, taskId, + MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE, + jobId, setCreatedTime); + } else { + // TaskAttemptEntity + tEntity = createTaskAttemptEntity(event, timestamp, taskAttemptId, + MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE, MAPREDUCE_TASK_ENTITY_TYPE, + taskId, setCreatedTime); + } + } + try { + if (appEntityWithJobMetrics == null) { + timelineClient.putEntitiesAsync(tEntity); + } else { + timelineClient.putEntities(tEntity, appEntityWithJobMetrics); + } + } catch (IOException | YarnException e) { + LOG.error("Failed to process Event " + event.getEventType() + + " for the job : " + jobId, e); + return; + } + if (event.getEventType() == EventType.JOB_SUBMITTED) { + // Publish configs after main job submitted event has been posted. + publishConfigsOnJobSubmittedEvent((JobSubmittedEvent)event, jobId); } - return nodes; } private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index ce1483c..f81aa39 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -137,6 +137,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -1046,6 +1047,7 @@ public class MRAppMaster extends CompositeService { private final Configuration conf; private final ClusterInfo clusterInfo = new ClusterInfo(); private final ClientToAMTokenSecretManager clientToAMTokenSecretManager; + private TimelineClient timelineClient = null; private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor; @@ -1055,6 +1057,18 @@ public class MRAppMaster extends CompositeService { this.clientToAMTokenSecretManager = new ClientToAMTokenSecretManager(appAttemptID, null); this.taskAttemptFinishingMonitor = taskAttemptFinishingMonitor; + if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, + MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA) + && YarnConfiguration.timelineServiceEnabled(conf)) { + + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + // create new version TimelineClient + timelineClient = TimelineClient.createTimelineClient( + appAttemptID.getApplicationId()); + } else { + timelineClient = TimelineClient.createTimelineClient(); + } + } } @Override @@ -1145,6 +1159,10 @@ public class MRAppMaster extends CompositeService { return taskAttemptFinishingMonitor; } + // Get Timeline Collector's address (get sync from RM) + public TimelineClient getTimelineClient() { + return timelineClient; + } } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 8aee972..943a2ce 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -1443,7 +1443,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, job.conf.get(MRJobConfig.WORKFLOW_NAME, ""), job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""), getWorkflowAdjacencies(job.conf), - job.conf.get(MRJobConfig.WORKFLOW_TAGS, "")); + job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""), job.conf); job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse)); //TODO JH Verify jobACLs, UserName via UGI? http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 9152b1b8..0e553b5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.MRAppMaster; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; @@ -856,6 +857,15 @@ public class RMContainerAllocator extends RMContainerRequestor handleUpdatedNodes(response); handleJobPriorityChange(response); + // handle receiving the timeline collector address for this app + String collectorAddr = response.getCollectorAddr(); + MRAppMaster.RunningAppContext appContext = + (MRAppMaster.RunningAppContext)this.getContext(); + if (collectorAddr != null && !collectorAddr.isEmpty() + && appContext.getTimelineClient() != null) { + appContext.getTimelineClient().setTimelineServiceAddress( + response.getCollectorAddr()); + } for (ContainerStatus cont : finishedContainers) { processFinishedContainer(cont); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java index 7612ceb..ac510b3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java @@ -18,13 +18,15 @@ package org.apache.hadoop.mapreduce.jobhistory; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.util.ArrayList; import java.util.Arrays; - -import static org.junit.Assert.*; +import java.util.Set; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -35,6 +37,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.junit.Test; public class TestEvents { @@ -405,6 +409,16 @@ public class TestEvents { this.datum = datum; } + @Override + public TimelineEvent toTimelineEvent() { + return null; + } + + @Override + public Set<TimelineMetric> getTimelineMetrics() { + return null; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java index d1a25b0..01c53e3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java @@ -53,8 +53,10 @@ import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.util.MRJobConfUtil; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.MRAppMaster.RunningAppContext; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; @@ -66,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.MiniYARNCluster; @@ -556,7 +559,7 @@ public class TestJobHistoryEventHandler { // stored to the Timeline store @Test (timeout=50000) public void testTimelineEventHandling() throws Exception { - TestParams t = new TestParams(false); + TestParams t = new TestParams(RunningAppContext.class, false); Configuration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); MiniYARNCluster yarnCluster = null; @@ -740,7 +743,7 @@ public class TestJobHistoryEventHandler { group2.addCounter("MARTHA_JONES", "Martha Jones", 3); group2.addCounter("DONNA_NOBLE", "Donna Noble", 2); group2.addCounter("ROSE_TYLER", "Rose Tyler", 1); - JsonNode jsonNode = jheh.countersToJSON(counters); + JsonNode jsonNode = JobHistoryEventUtils.countersToJSON(counters); String jsonStr = new ObjectMapper().writeValueAsString(jsonNode); String expected = "[{\"NAME\":\"COMPANIONS\",\"DISPLAY_NAME\":\"Companions " + "of the Doctor\",\"COUNTERS\":[{\"NAME\":\"AMY_POND\",\"DISPLAY_NAME\"" @@ -763,19 +766,19 @@ public class TestJobHistoryEventHandler { public void testCountersToJSONEmpty() throws Exception { JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0); Counters counters = null; - JsonNode jsonNode = jheh.countersToJSON(counters); + JsonNode jsonNode = JobHistoryEventUtils.countersToJSON(counters); String jsonStr = new ObjectMapper().writeValueAsString(jsonNode); String expected = "[]"; Assert.assertEquals(expected, jsonStr); counters = new Counters(); - jsonNode = jheh.countersToJSON(counters); + jsonNode = JobHistoryEventUtils.countersToJSON(counters); jsonStr = new ObjectMapper().writeValueAsString(jsonNode); expected = "[]"; Assert.assertEquals(expected, jsonStr); counters.addGroup("DOCTORS", "Incarnations of the Doctor"); - jsonNode = jheh.countersToJSON(counters); + jsonNode = JobHistoryEventUtils.countersToJSON(counters); jsonStr = new ObjectMapper().writeValueAsString(jsonNode); expected = "[{\"NAME\":\"DOCTORS\",\"DISPLAY_NAME\":\"Incarnations of the " + "Doctor\",\"COUNTERS\":[]}]"; @@ -811,21 +814,30 @@ public class TestJobHistoryEventHandler { } } - private AppContext mockAppContext(ApplicationId appId, boolean isLastAMRetry) { - JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appId)); - AppContext mockContext = mock(AppContext.class); + private Job mockJob() { Job mockJob = mock(Job.class); when(mockJob.getAllCounters()).thenReturn(new Counters()); when(mockJob.getTotalMaps()).thenReturn(10); when(mockJob.getTotalReduces()).thenReturn(10); when(mockJob.getName()).thenReturn("mockjob"); + return mockJob; + } + + private AppContext mockAppContext(Class<? extends AppContext> contextClass, + ApplicationId appId, boolean isLastAMRetry) { + JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appId)); + AppContext mockContext = mock(contextClass); + Job mockJob = mockJob(); when(mockContext.getJob(jobId)).thenReturn(mockJob); when(mockContext.getApplicationID()).thenReturn(appId); when(mockContext.isLastAMRetry()).thenReturn(isLastAMRetry); + if (mockContext instanceof RunningAppContext) { + when(((RunningAppContext)mockContext).getTimelineClient()). + thenReturn(TimelineClient.createTimelineClient()); + } return mockContext; } - private class TestParams { boolean isLastAMRetry; String workDir = setupTestWorkDir(); @@ -840,11 +852,15 @@ public class TestJobHistoryEventHandler { AppContext mockAppContext; public TestParams() { - this(false); + this(AppContext.class, false); } public TestParams(boolean isLastAMRetry) { + this(AppContext.class, isLastAMRetry); + } + public TestParams(Class<? extends AppContext> contextClass, + boolean isLastAMRetry) { this.isLastAMRetry = isLastAMRetry; - mockAppContext = mockAppContext(appId, this.isLastAMRetry); + mockAppContext = mockAppContext(contextClass, appId, this.isLastAMRetry); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index ce8d777..41fcf3b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -61,6 +61,7 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.MRAppMaster; +import org.apache.hadoop.mapreduce.v2.app.MRAppMaster.RunningAppContext; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; @@ -1903,7 +1904,7 @@ public class TestRMContainerAllocator { private AllocateResponse allocateResponse; private static AppContext createAppContext( ApplicationAttemptId appAttemptId, Job job) { - AppContext context = mock(AppContext.class); + AppContext context = mock(RunningAppContext.class); ApplicationId appId = appAttemptId.getApplicationId(); when(context.getApplicationID()).thenReturn(appId); when(context.getApplicationAttemptId()).thenReturn(appAttemptId); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java index 266aa94..a1447c5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java @@ -18,14 +18,18 @@ package org.apache.hadoop.mapreduce.jobhistory; +import java.util.Set; + +import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.avro.util.Utf8; - /** * Event to record start of a task attempt * @@ -166,4 +170,24 @@ public class AMStartedEvent implements HistoryEvent { public EventType getEventType() { return EventType.AM_STARTED; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("APPLICATION_ATTEMPT_ID", + getAppAttemptId() == null ? "" : getAppAttemptId().toString()); + tEvent.addInfo("CONTAINER_ID", getContainerId() == null ? + "" : getContainerId().toString()); + tEvent.addInfo("NODE_MANAGER_HOST", getNodeManagerHost()); + tEvent.addInfo("NODE_MANAGER_PORT", getNodeManagerPort()); + tEvent.addInfo("NODE_MANAGER_HTTP_PORT", getNodeManagerHttpPort()); + tEvent.addInfo("START_TIME", getStartTime()); + return tEvent; + } + + @Override + public Set<TimelineMetric> getTimelineMetrics() { + return null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java index a30748c..1ba7195 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java @@ -18,8 +18,12 @@ package org.apache.hadoop.mapreduce.jobhistory; +import java.util.Set; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; /** * Interface for event wrapper classes. Implementations each wrap an @@ -37,4 +41,18 @@ public interface HistoryEvent { /** Set the Avro datum wrapped by this. */ void setDatum(Object datum); + + /** + * Map HistoryEvent to TimelineEvent. + * + * @return the timeline event + */ + TimelineEvent toTimelineEvent(); + + /** + * Counters or Metrics if any else return null. + * + * @return the set of timeline metrics + */ + Set<TimelineMetric> getTimelineMetrics(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java index 0fa5868..ea21f60 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java @@ -18,11 +18,17 @@ package org.apache.hadoop.mapreduce.jobhistory; +import java.util.Set; + import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; /** * Event to record successful completion of job @@ -133,4 +139,31 @@ public class JobFinishedEvent implements HistoryEvent { public Counters getReduceCounters() { return reduceCounters; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("NUM_MAPS", getFinishedMaps()); + tEvent.addInfo("NUM_REDUCES", getFinishedReduces()); + tEvent.addInfo("FAILED_MAPS", getFailedMaps()); + tEvent.addInfo("FAILED_REDUCES", getFailedReduces()); + tEvent.addInfo("FINISHED_MAPS", getFinishedMaps()); + tEvent.addInfo("FINISHED_REDUCES", getFinishedReduces()); + // TODO replace SUCCEEDED with JobState.SUCCEEDED.toString() + tEvent.addInfo("JOB_STATUS", "SUCCEEDED"); + return tEvent; + } + + @Override + public Set<TimelineMetric> getTimelineMetrics() { + Set<TimelineMetric> jobMetrics = JobHistoryEventUtils. + countersToTimelineMetric(getTotalCounters(), finishTime); + jobMetrics.addAll(JobHistoryEventUtils. + countersToTimelineMetric(getMapCounters(), finishTime, "MAP:")); + jobMetrics.addAll(JobHistoryEventUtils. + countersToTimelineMetric(getReduceCounters(), finishTime, "REDUCE:")); + return jobMetrics; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java index b45f373..f5941aa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java @@ -18,13 +18,15 @@ package org.apache.hadoop.mapreduce.jobhistory; -import java.io.IOException; +import java.util.Set; +import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.JobID; - -import org.apache.avro.util.Utf8; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; /** * Event to record changes in the submit and launch time of @@ -65,4 +67,17 @@ public class JobInfoChangeEvent implements HistoryEvent { return EventType.JOB_INFO_CHANGED; } + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("SUBMIT_TIME", getSubmitTime()); + tEvent.addInfo("LAUNCH_TIME", getLaunchTime()); + return tEvent; + } + + @Override + public Set<TimelineMetric> getTimelineMetrics() { + return null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java index 5abb40e..784267f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java @@ -18,11 +18,15 @@ package org.apache.hadoop.mapreduce.jobhistory; +import java.util.Set; + +import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.JobID; - -import org.apache.avro.util.Utf8; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; /** * Event to record the initialization of a job @@ -73,4 +77,21 @@ public class JobInitedEvent implements HistoryEvent { } /** Get whether the job's map and reduce stages were combined */ public boolean getUberized() { return datum.getUberized(); } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("START_TIME", getLaunchTime()); + tEvent.addInfo("STATUS", getStatus()); + tEvent.addInfo("TOTAL_MAPS", getTotalMaps()); + tEvent.addInfo("TOTAL_REDUCES", getTotalReduces()); + tEvent.addInfo("UBERIZED", getUberized()); + return tEvent; + } + + @Override + public Set<TimelineMetric> getTimelineMetrics() { + return null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java index 6c32462..1616dd1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java @@ -18,14 +18,16 @@ package org.apache.hadoop.mapreduce.jobhistory; -import java.io.IOException; +import java.util.Set; +import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapred.JobPriority; import org.apache.hadoop.mapreduce.JobID; - -import org.apache.avro.util.Utf8; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; /** * Event to record the change of priority of a job @@ -65,4 +67,17 @@ public class JobPriorityChangeEvent implements HistoryEvent { return EventType.JOB_PRIORITY_CHANGED; } + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("PRIORITY", getPriority().toString()); + return tEvent; + } + + @Override + public Set<TimelineMetric> getTimelineMetrics() { + return null; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java index 86078e6..66f3781 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java @@ -18,8 +18,13 @@ package org.apache.hadoop.mapreduce.jobhistory; +import java.util.Set; + import org.apache.avro.util.Utf8; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; @SuppressWarnings("deprecation") public class JobQueueChangeEvent implements HistoryEvent { @@ -60,4 +65,17 @@ public class JobQueueChangeEvent implements HistoryEvent { return null; } + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("QUEUE_NAMES", getJobQueueName()); + return tEvent; + } + + @Override + public Set<TimelineMetric> getTimelineMetrics() { + return null; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java index 9e11a55..0963b45 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java @@ -18,13 +18,15 @@ package org.apache.hadoop.mapreduce.jobhistory; -import java.io.IOException; +import java.util.Set; +import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.JobID; - -import org.apache.avro.util.Utf8; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; /** * Event to record the change of status for a job @@ -61,4 +63,17 @@ public class JobStatusChangedEvent implements HistoryEvent { return EventType.JOB_STATUS_CHANGED; } + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("STATUS", getStatus()); + return tEvent; + } + + @Override + public Set<TimelineMetric> getTimelineMetrics() { + return null; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java index 845f6f7..e394f5b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java @@ -21,14 +21,18 @@ package org.apache.hadoop.mapreduce.jobhistory; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.security.authorize.AccessControlList; - -import org.apache.avro.util.Utf8; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; /** * Event to record the submission of a job @@ -38,6 +42,7 @@ import org.apache.avro.util.Utf8; @InterfaceStability.Unstable public class JobSubmittedEvent implements HistoryEvent { private JobSubmitted datum = new JobSubmitted(); + private JobConf jobConf = null; /** * Create an event to record job submission @@ -80,6 +85,31 @@ public class JobSubmittedEvent implements HistoryEvent { workflowAdjacencies, ""); } + /** + * Create an event to record job submission. + * @param id The job Id of the job + * @param jobName Name of the job + * @param userName Name of the user who submitted the job + * @param submitTime Time of submission + * @param jobConfPath Path of the Job Configuration file + * @param jobACLs The configured acls for the job. + * @param jobQueueName The job-queue to which this job was submitted to + * @param workflowId The Id of the workflow + * @param workflowName The name of the workflow + * @param workflowNodeName The node name of the workflow + * @param workflowAdjacencies The adjacencies of the workflow + * @param workflowTags Comma-separated tags for the workflow + */ + public JobSubmittedEvent(JobID id, String jobName, String userName, + long submitTime, String jobConfPath, + Map<JobACL, AccessControlList> jobACLs, String jobQueueName, + String workflowId, String workflowName, String workflowNodeName, + String workflowAdjacencies, String workflowTags) { + this(id, jobName, userName, submitTime, jobConfPath, jobACLs, + jobQueueName, workflowId, workflowName, workflowNodeName, + workflowAdjacencies, workflowTags, null); + } + /** * Create an event to record job submission * @param id The job Id of the job @@ -94,12 +124,13 @@ public class JobSubmittedEvent implements HistoryEvent { * @param workflowNodeName The node name of the workflow * @param workflowAdjacencies The adjacencies of the workflow * @param workflowTags Comma-separated tags for the workflow + * @param conf Job configuration */ public JobSubmittedEvent(JobID id, String jobName, String userName, long submitTime, String jobConfPath, Map<JobACL, AccessControlList> jobACLs, String jobQueueName, String workflowId, String workflowName, String workflowNodeName, - String workflowAdjacencies, String workflowTags) { + String workflowAdjacencies, String workflowTags, JobConf conf) { datum.setJobid(new Utf8(id.toString())); datum.setJobName(new Utf8(jobName)); datum.setUserName(new Utf8(userName)); @@ -129,6 +160,7 @@ public class JobSubmittedEvent implements HistoryEvent { if (workflowTags != null) { datum.setWorkflowTags(new Utf8(workflowTags)); } + jobConf = conf; } JobSubmittedEvent() {} @@ -206,4 +238,33 @@ public class JobSubmittedEvent implements HistoryEvent { /** Get the event type */ public EventType getEventType() { return EventType.JOB_SUBMITTED; } + public JobConf getJobConf() { + return jobConf; + } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("SUBMIT_TIME", getSubmitTime()); + tEvent.addInfo("QUEUE_NAME", getJobQueueName()); + tEvent.addInfo("JOB_NAME", getJobName()); + tEvent.addInfo("USER_NAME", getUserName()); + tEvent.addInfo("JOB_CONF_PATH", getJobConfPath()); + tEvent.addInfo("ACLS", getJobAcls()); + tEvent.addInfo("JOB_QUEUE_NAME", getJobQueueName()); + tEvent.addInfo("WORKLFOW_ID", getWorkflowId()); + tEvent.addInfo("WORKFLOW_NAME", getWorkflowName()); + tEvent.addInfo("WORKFLOW_NODE_NAME", getWorkflowNodeName()); + tEvent.addInfo("WORKFLOW_ADJACENCIES", + getWorkflowAdjacencies()); + tEvent.addInfo("WORKFLOW_TAGS", getWorkflowTags()); + + return tEvent; + } + + @Override + public Set<TimelineMetric> getTimelineMetrics() { + return null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java index 1b7773d..ce6fa32 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java @@ -18,14 +18,18 @@ package org.apache.hadoop.mapreduce.jobhistory; -import com.google.common.base.Joiner; +import java.util.Collections; +import java.util.Set; import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import java.util.Collections; +import com.google.common.base.Joiner; /** * Event to record Failed and Killed completion of jobs @@ -119,4 +123,23 @@ public class JobUnsuccessfulCompletionEvent implements HistoryEvent { final CharSequence diagnostics = datum.getDiagnostics(); return diagnostics == null ? NODIAGS : diagnostics.toString(); } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("NUM_MAPS", getFinishedMaps()); + tEvent.addInfo("NUM_REDUCES", getFinishedReduces()); + tEvent.addInfo("JOB_STATUS", getStatus()); + tEvent.addInfo("DIAGNOSTICS", getDiagnostics()); + tEvent.addInfo("FINISHED_MAPS", getFinishedMaps()); + tEvent.addInfo("FINISHED_REDUCES", getFinishedReduces()); + return tEvent; + } + + @Override + public Set<TimelineMetric> getTimelineMetrics() { + return null; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org