YARN-3391. Clearly define flow ID/ flow run / flow version in API and storage. Contributed by Zhijie Shen
(cherry picked from commit 68c6232f8423e55b4d152ef3d1d66aeb2d6a555e) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/84389771 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/84389771 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/84389771 Branch: refs/heads/YARN-2928 Commit: 84389771f99d3630e2c81bd6fcdc0f3e03fd2f93 Parents: 462c48a Author: Junping Du <junping...@apache.org> Authored: Thu Apr 9 18:04:27 2015 -0700 Committer: Sangjin Lee <sj...@apache.org> Committed: Tue Aug 25 10:47:09 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 ++ .../applications/distributedshell/Client.java | 36 +++++++++++++------ .../distributedshell/TestDistributedShell.java | 13 +++---- .../yarn/util/timeline/TimelineUtils.java | 34 +++++++++++++++--- .../GetTimelineCollectorContextResponse.java | 17 +++++---- ...tTimelineCollectorContextResponsePBImpl.java | 38 +++++++++++++------- .../yarn_server_common_service_protos.proto | 5 +-- .../java/org/apache/hadoop/yarn/TestRPC.java | 7 ++-- .../collectormanager/NMCollectorService.java | 2 +- .../containermanager/ContainerManagerImpl.java | 18 ++++++---- .../application/Application.java | 6 ++-- .../application/ApplicationImpl.java | 27 +++++++++----- .../application/TestApplication.java | 2 +- .../yarn/server/nodemanager/webapp/MockApp.java | 23 +++++++++--- .../nodemanager/webapp/TestNMWebServices.java | 2 +- .../server/resourcemanager/ClientRMService.java | 21 +++++++++++ .../resourcemanager/amlauncher/AMLauncher.java | 30 ++++++++-------- .../TestTimelineServiceClientIntegration.java | 2 +- .../collector/AppLevelTimelineCollector.java | 10 +++--- .../collector/TimelineCollector.java | 4 +-- .../collector/TimelineCollectorContext.java | 32 +++++++++++------ .../collector/TimelineCollectorManager.java | 15 ++++---- .../storage/FileSystemTimelineWriterImpl.java | 13 +++---- .../timelineservice/storage/TimelineWriter.java | 7 ++-- ...TestPerNodeTimelineCollectorsAuxService.java | 2 +- .../collector/TestTimelineCollectorManager.java | 3 +- .../TestFileSystemTimelineWriterImpl.java | 8 +++-- 27 files changed, 256 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a3a1f14..aea859a 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -50,6 +50,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-3334. NM uses timeline client to publish container metrics to new timeline service. (Junping Du via zjshen) + YARN-3391. Clearly define flow ID/ flow run / flow version in API and storage. + (Zhijie Shen via junping_du) + IMPROVEMENTS OPTIMIZATIONS http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index db69490..ff2f594 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -185,8 +185,9 @@ public class Client { // Timeline domain writer access control private String modifyACLs = null; - private String flowId = null; - private String flowRunId = null; + private String flowName = null; + private String flowVersion = null; + private long flowRunId = 0L; // Command line options private Options opts; @@ -289,9 +290,11 @@ public class Client { + "modify the timeline entities in the given domain"); opts.addOption("create", false, "Flag to indicate whether to create the " + "domain specified with -domain."); - opts.addOption("flow", true, "ID of the flow which the distributed shell " + opts.addOption("flow_name", true, "Flow name which the distributed shell " + "app belongs to"); - opts.addOption("flow_run", true, "ID of the flowrun which the distributed " + opts.addOption("flow_version", true, "Flow version which the distributed " + + "shell app belongs to"); + opts.addOption("flow_run_id", true, "Flow run ID which the distributed " + "shell app belongs to"); opts.addOption("help", false, "Print usage"); opts.addOption("node_label_expression", true, @@ -452,11 +455,19 @@ public class Client { } } - if (cliParser.hasOption("flow")) { - flowId = cliParser.getOptionValue("flow"); + if (cliParser.hasOption("flow_name")) { + flowName = cliParser.getOptionValue("flow_name"); + } + if (cliParser.hasOption("flow_version")) { + flowVersion = cliParser.getOptionValue("flow_version"); } - if (cliParser.hasOption("flow_run")) { - flowRunId = cliParser.getOptionValue("flow_run"); + if (cliParser.hasOption("flow_run_id")) { + try { + flowRunId = Long.valueOf(cliParser.getOptionValue("flow_run_id")); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "Flow run is not a valid long value", e); + } } return true; } @@ -550,10 +561,13 @@ public class Client { } Set<String> tags = new HashSet<String>(); - if (flowId != null) { - tags.add(TimelineUtils.generateFlowIdTag(flowId)); + if (flowName != null) { + tags.add(TimelineUtils.generateFlowNameTag(flowName)); + } + if (flowVersion != null) { + tags.add(TimelineUtils.generateFlowVersionTag(flowVersion)); } - if (flowRunId != null) { + if (flowRunId != 0) { tags.add(TimelineUtils.generateFlowRunIdTag(flowRunId)); } appContext.setApplicationTags(tags); http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index daaad7c..cc5f5e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -237,9 +237,11 @@ public class TestDistributedShell { args = mergeArgs(args, timelineArgs); if (!defaultFlow) { String[] flowArgs = { - "--flow", - "test_flow_id", - "--flow_run", + "--flow_name", + "test_flow_name", + "--flow_version", + "test_flow_version", + "--flow_run_id", "12345678" }; args = mergeArgs(args, flowArgs); @@ -368,7 +370,8 @@ public class TestDistributedShell { UserGroupInformation.getCurrentUser().getShortUserName() + (defaultFlow ? "/" + TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) + - "/0/" : "/test_flow_id/12345678/") + appId.toString(); + "/1/1/" : "/test_flow_name/test_flow_version/12345678/") + + appId.toString(); // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs String outputDirApp = basePath + "/DS_APP_ATTEMPT/"; @@ -393,8 +396,6 @@ public class TestDistributedShell { String containerFileName = outputDirContainer + containerTimestampFileName; File containerFile = new File(containerFileName); Assert.assertTrue(containerFile.exists()); - String appTimeStamp = appId.getClusterTimestamp() + "_" + appId.getId() - + "_"; // Verify NM posting container metrics info. String outputDirContainerMetrics = basePath + "/" + http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java index ce14562..137b7c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java @@ -44,7 +44,8 @@ import org.codehaus.jackson.map.ObjectMapper; @Evolving public class TimelineUtils { - public static final String FLOW_ID_TAG_PREFIX = "TIMELINE_FLOW_ID_TAG"; + public static final String FLOW_NAME_TAG_PREFIX = "TIMELINE_FLOW_NAME_TAG"; + public static final String FLOW_VERSION_TAG_PREFIX = "TIMELINE_FLOW_VERSION_TAG"; public static final String FLOW_RUN_ID_TAG_PREFIX = "TIMELINE_FLOW_RUN_ID_TAG"; private static ObjectMapper mapper; @@ -128,11 +129,36 @@ public class TimelineUtils { return "flow_" + appId.getClusterTimestamp() + "_" + appId.getId(); } - public static String generateFlowIdTag(String flowId) { - return FLOW_ID_TAG_PREFIX + ":" + flowId; + /** + * Generate flow name tag + * + * @param flowName flow name that identifies a distinct flow application which + * can be run repeatedly over time + * @return + */ + public static String generateFlowNameTag(String flowName) { + return FLOW_NAME_TAG_PREFIX + ":" + flowName; + } + + /** + * Generate flow version tag + * + * @param flowVersion flow version that keeps track of the changes made to the + * flow + * @return + */ + public static String generateFlowVersionTag(String flowVersion) { + return FLOW_VERSION_TAG_PREFIX + ":" + flowVersion; } - public static String generateFlowRunIdTag(String flowRunId) { + /** + * Generate flow run ID tag + * + * @param flowRunId flow run ID that identifies one instance (or specific + * execution) of that flow + * @return + */ + public static String generateFlowRunIdTag(long flowRunId) { return FLOW_RUN_ID_TAG_PREFIX + ":" + flowRunId; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java index 1558e2f..bd5c11e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java @@ -23,11 +23,12 @@ import org.apache.hadoop.yarn.util.Records; public abstract class GetTimelineCollectorContextResponse { public static GetTimelineCollectorContextResponse newInstance( - String userId, String flowId, String flowRunId) { + String userId, String flowName, String flowVersion, long flowRunId) { GetTimelineCollectorContextResponse response = Records.newRecord(GetTimelineCollectorContextResponse.class); response.setUserId(userId); - response.setFlowId(flowId); + response.setFlowName(flowName); + response.setFlowVersion(flowVersion); response.setFlowRunId(flowRunId); return response; } @@ -36,11 +37,15 @@ public abstract class GetTimelineCollectorContextResponse { public abstract void setUserId(String userId); - public abstract String getFlowId(); + public abstract String getFlowName(); - public abstract void setFlowId(String flowId); + public abstract void setFlowName(String flowName); - public abstract String getFlowRunId(); + public abstract String getFlowVersion(); - public abstract void setFlowRunId(String flowRunId); + public abstract void setFlowVersion(String flowVersion); + + public abstract long getFlowRunId(); + + public abstract void setFlowRunId(long flowRunId); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java index 6dc1f77..34713cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java @@ -102,40 +102,52 @@ public class GetTimelineCollectorContextResponsePBImpl extends } @Override - public String getFlowId() { + public String getFlowName() { GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasFlowId()) { + if (!p.hasFlowName()) { return null; } - return p.getFlowId(); + return p.getFlowName(); } @Override - public void setFlowId(String flowId) { + public void setFlowName(String flowName) { maybeInitBuilder(); - if (flowId == null) { - builder.clearFlowId(); + if (flowName == null) { + builder.clearFlowName(); return; } - builder.setFlowId(flowId); + builder.setFlowName(flowName); } @Override - public String getFlowRunId() { + public String getFlowVersion() { GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasFlowRunId()) { + if (!p.hasFlowVersion()) { return null; } - return p.getFlowRunId(); + return p.getFlowVersion(); } @Override - public void setFlowRunId(String flowRunId) { + public void setFlowVersion(String flowVersion) { maybeInitBuilder(); - if (flowRunId == null) { - builder.clearFlowRunId(); + if (flowVersion == null) { + builder.clearFlowVersion(); return; } + builder.setFlowVersion(flowVersion); + } + + @Override + public long getFlowRunId() { + GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder; + return p.getFlowRunId(); + } + + @Override + public void setFlowRunId(long flowRunId) { + maybeInitBuilder(); builder.setFlowRunId(flowRunId); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 2234752..22c4cf6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -115,8 +115,9 @@ message GetTimelineCollectorContextRequestProto { message GetTimelineCollectorContextResponseProto { optional string user_id = 1; - optional string flow_id = 2; - optional string flow_run_id = 3; + optional string flow_name = 2; + optional string flow_version = 3; + optional int64 flow_run_id = 4; } message NMContainerStatusProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index 3c9f57b..52ecd73 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -176,8 +176,9 @@ public class TestRPC { GetTimelineCollectorContextResponse response = proxy.getTimelineCollectorContext(request); Assert.assertEquals("test_user_id", response.getUserId()); - Assert.assertEquals("test_flow_id", response.getFlowId()); - Assert.assertEquals("test_flow_run_id", response.getFlowRunId()); + Assert.assertEquals("test_flow_name", response.getFlowName()); + Assert.assertEquals("test_flow_version", response.getFlowVersion()); + Assert.assertEquals(12345678L, response.getFlowRunId()); } catch (YarnException | IOException e) { Assert.fail("RPC call failured is not expected here."); } @@ -374,7 +375,7 @@ public class TestRPC { throws YarnException, IOException { if (request.getApplicationId().getId() == 1) { return GetTimelineCollectorContextResponse.newInstance( - "test_user_id", "test_flow_id", "test_flow_run_id"); + "test_user_id", "test_flow_name", "test_flow_version", 12345678L); } else { throw new YarnException("The application is not found."); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java index f37be23..dc5601f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java @@ -130,6 +130,6 @@ public class NMCollectorService extends CompositeService implements " doesn't exist on NM."); } return GetTimelineCollectorContextResponse.newInstance( - app.getUser(), app.getFlowId(), app.getFlowRunId()); + app.getUser(), app.getFlowName(), app.getFlowVersion(), app.getFlowRunId()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 8a60dcd..4dd9fa6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -300,7 +300,7 @@ public class ContainerManagerImpl extends CompositeService implements LOG.info("Recovering application " + appId); //TODO: Recover flow and flow run ID ApplicationImpl app = new ApplicationImpl( - dispatcher, p.getUser(), null, null, appId, creds, context); + dispatcher, p.getUser(), null, null, 0, appId, creds, context); context.getApplications().put(appId, app); app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext)); } @@ -870,12 +870,18 @@ public class ContainerManagerImpl extends CompositeService implements try { if (!serviceStopped) { // Create the application - String flowId = launchContext.getEnvironment().get( - TimelineUtils.FLOW_ID_TAG_PREFIX); - String flowRunId = launchContext.getEnvironment().get( + String flowName = launchContext.getEnvironment().get( + TimelineUtils.FLOW_NAME_TAG_PREFIX); + String flowVersion = launchContext.getEnvironment().get( + TimelineUtils.FLOW_VERSION_TAG_PREFIX); + String flowRunIdStr = launchContext.getEnvironment().get( TimelineUtils.FLOW_RUN_ID_TAG_PREFIX); - Application application = new ApplicationImpl( - dispatcher, user, flowId, flowRunId, applicationID, credentials, context); + long flowRunId = 0L; + if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) { + flowRunId = Long.valueOf(flowRunIdStr); + } + Application application = new ApplicationImpl(dispatcher, user, + flowName, flowVersion, flowRunId, applicationID, credentials, context); if (null == context.getApplications().putIfAbsent(applicationID, application)) { LOG.info("Creating a new application reference for app " + applicationID); http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java index 0c95193..5de3398 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java @@ -36,9 +36,11 @@ public interface Application extends EventHandler<ApplicationEvent> { ApplicationState getApplicationState(); - String getFlowId(); + String getFlowName(); - String getFlowRunId(); + String getFlowVersion(); + + long getFlowRunId(); TimelineClient getTimelineClient(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index b0ab550..9319b6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -67,8 +67,9 @@ public class ApplicationImpl implements Application { final Dispatcher dispatcher; final String user; - final String flowId; - final String flowRunId; + final String flowName; + final String flowVersion; + final long flowRunId; final ApplicationId appId; final Credentials credentials; Map<ApplicationAccessType, String> applicationACLs; @@ -85,12 +86,13 @@ public class ApplicationImpl implements Application { Map<ContainerId, Container> containers = new HashMap<ContainerId, Container>(); - public ApplicationImpl(Dispatcher dispatcher, String user, String flowId, - String flowRunId, ApplicationId appId, Credentials credentials, - Context context) { + public ApplicationImpl(Dispatcher dispatcher, String user, String flowName, + String flowVersion, long flowRunId, ApplicationId appId, + Credentials credentials, Context context) { this.dispatcher = dispatcher; this.user = user; - this.flowId = flowId; + this.flowName = flowName; + this.flowVersion = flowVersion; this.flowRunId = flowRunId; this.appId = appId; this.credentials = credentials; @@ -517,11 +519,18 @@ public class ApplicationImpl implements Application { } } - public String getFlowId() { - return flowId; + @Override + public String getFlowName() { + return flowName; } - public String getFlowRunId() { + @Override + public String getFlowVersion() { + return flowVersion; + } + + @Override + public long getFlowRunId() { return flowRunId; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index 5303df5..3889d2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -531,7 +531,7 @@ public class TestApplication { this.appId = BuilderUtils.newApplicationId(timestamp, id); app = new ApplicationImpl( - dispatcher, this.user, null, null, appId, null, context); + dispatcher, this.user, null, null, 0, appId, null, context); containers = new ArrayList<Container>(); for (int i = 0; i < numContainers; i++) { Container container = createMockedContainer(this.appId, i); http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java index 2ee572b..4d1be84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java @@ -40,8 +40,9 @@ public class MockApp implements Application { Map<ContainerId, Container> containers = new HashMap<ContainerId, Container>(); ApplicationState appState; Application app; - String flowId; - String flowRunId; + String flowName; + String flowVersion; + long flowRunId; TimelineClient timelineClient = null; public MockApp(int uniqId) { @@ -59,6 +60,14 @@ public class MockApp implements Application { appState = ApplicationState.NEW; } + public MockApp(String user, long clusterTimeStamp, int uniqId, + String flowName, String flowVersion, long flowRunId) { + this(user, clusterTimeStamp, uniqId); + this.flowName = flowName; + this.flowVersion = flowVersion; + this.flowRunId = flowRunId; + } + public void setState(ApplicationState state) { this.appState = state; } @@ -81,11 +90,15 @@ public class MockApp implements Application { public void handle(ApplicationEvent event) {} - public String getFlowId() { - return flowId; + public String getFlowName() { + return flowName; + } + + public String getFlowVersion() { + return flowVersion; } - public String getFlowRunId() { + public long getFlowRunId() { return flowRunId; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java index 476f3df..ef5eb65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java @@ -327,7 +327,7 @@ public class TestNMWebServices extends JerseyTestBase { final String filename = "logfile1"; final String logMessage = "log message\n"; nmContext.getApplications().put(appId, new ApplicationImpl(null, "user", - null, null, appId, null, nmContext)); + null, null, 0, appId, null, nmContext)); MockContainer container = new MockContainer(appAttemptId, new AsyncDispatcher(), new Configuration(), "user", appId, 1); http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 3e16165..666c8f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -154,6 +154,7 @@ import org.apache.hadoop.yarn.util.UTCClock; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; /** @@ -554,6 +555,26 @@ public class ClientRMService extends AbstractService implements throw RPCUtil.getRemoteException(ie); } + // Sanity check for flow run + String value = null; + try { + for (String tag : submissionContext.getApplicationTags()) { + if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX + ":") || + tag.startsWith( + TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) { + value = tag.substring(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length() + 1); + Long.valueOf(value); + } + } + } catch (NumberFormatException e) { + LOG.warn("Invalid to flow run: " + value + + ". Flow run should be a long integer", e); + RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, + e.getMessage(), "ClientRMService", + "Exception in submitting application", applicationId); + throw RPCUtil.getRemoteException(e); + } + // Check whether app has already been put into rmContext, // If it is, simply return the response if (rmContext.getRMApps().get(applicationId) != null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index 5c0b02b..37aa3e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -222,22 +222,9 @@ public class AMLauncher implements Runnable { // Set flow context info for (String tag : rmContext.getRMApps().get(applicationId).getApplicationTags()) { - if (tag.startsWith(TimelineUtils.FLOW_ID_TAG_PREFIX + ":") || - tag.startsWith(TimelineUtils.FLOW_ID_TAG_PREFIX.toLowerCase() + ":")) { - String value = tag.substring( - TimelineUtils.FLOW_ID_TAG_PREFIX.length() + 1); - if (!value.isEmpty()) { - environment.put(TimelineUtils.FLOW_ID_TAG_PREFIX, value); - } - } - if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX + ":") || - tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) { - String value = tag.substring( - TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length() + 1); - if (!value.isEmpty()) { - environment.put(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, value); - } - } + setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX, tag); + setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX, tag); + setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, tag); } Credentials credentials = new Credentials(); DataInputByteBuffer dibb = new DataInputByteBuffer(); @@ -259,6 +246,17 @@ public class AMLauncher implements Runnable { container.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); } + private static void setFlowTags( + Map<String, String> environment, String tagPrefix, String tag) { + if (tag.startsWith(tagPrefix + ":") || + tag.startsWith(tagPrefix.toLowerCase() + ":")) { + String value = tag.substring(tagPrefix.length() + 1); + if (!value.isEmpty()) { + environment.put(tagPrefix, value); + } + } + } + @VisibleForTesting protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() { Token<AMRMTokenIdentifier> amrmToken = http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java index c8b9625..54c806c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -97,7 +97,7 @@ public class TestTimelineServiceClientIntegration { mock(CollectorNodemanagerProtocol.class); try { GetTimelineCollectorContextResponse response = - GetTimelineCollectorContextResponse.newInstance(null, null, null); + GetTimelineCollectorContextResponse.newInstance(null, null, null, 0L); when(protocol.getTimelineCollectorContext(any( GetTimelineCollectorContextRequest.class))).thenReturn(response); } catch (YarnException | IOException e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java index 60ddde5..5bc70e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -54,10 +54,12 @@ public class AppLevelTimelineCollector extends TimelineCollector { // context info from NM. // Current user usually is not the app user, but keep this field non-null context.setUserId(UserGroupInformation.getCurrentUser().getShortUserName()); - // Use app ID to generate a default flow ID for orphan app - context.setFlowId(TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId)); - // Set the flow run ID to 0 if it's an orphan app - context.setFlowRunId("0"); + // Use app ID to generate a default flow name for orphan app + context.setFlowName(TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId)); + // Set the flow version to string 1 if it's an orphan app + context.setFlowVersion("1"); + // Set the flow run ID to 1 if it's an orphan app + context.setFlowRunId(1L); context.setAppId(appId.toString()); super.serviceInit(conf); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 677feb1..f1d3d72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -100,8 +100,8 @@ public abstract class TimelineCollector extends CompositeService { TimelineCollectorContext context = getTimelineEntityContext(); return writer.write(context.getClusterId(), context.getUserId(), - context.getFlowId(), context.getFlowRunId(), context.getAppId(), - entities); + context.getFlowName(), context.getFlowVersion(), context.getFlowRunId(), + context.getAppId(), entities); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java index c1a10a6..6cc477f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java @@ -22,19 +22,21 @@ public class TimelineCollectorContext { private String clusterId; private String userId; - private String flowId; - private String flowRunId; + private String flowName; + private String flowVersion; + private long flowRunId; private String appId; public TimelineCollectorContext() { - this(null, null, null, null, null); + this(null, null, null, null, 0L, null); } public TimelineCollectorContext(String clusterId, String userId, - String flowId, String flowRunId, String appId) { + String flowName, String flowVersion, long flowRunId, String appId) { this.clusterId = clusterId; this.userId = userId; - this.flowId = flowId; + this.flowName = flowName; + this.flowVersion = flowVersion; this.flowRunId = flowRunId; this.appId = appId; } @@ -55,19 +57,27 @@ public class TimelineCollectorContext { this.userId = userId; } - public String getFlowId() { - return flowId; + public String getFlowName() { + return flowName; } - public void setFlowId(String flowId) { - this.flowId = flowId; + public void setFlowName(String flowName) { + this.flowName = flowName; } - public String getFlowRunId() { + public String getFlowVersion() { + return flowVersion; + } + + public void setFlowVersion(String flowVersion) { + this.flowVersion = flowVersion; + } + + public long getFlowRunId() { return flowRunId; } - public void setFlowRunId(String flowRunId) { + public void setFlowRunId(long flowRunId) { this.flowRunId = flowRunId; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index 5f23c25..9a566a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -35,7 +35,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.http.lib.StaticUserWebFilter; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -273,12 +272,16 @@ public class TimelineCollectorManager extends CompositeService { if (userId != null && !userId.isEmpty()) { collector.getTimelineEntityContext().setUserId(userId); } - String flowId = response.getFlowId(); - if (flowId != null && !flowId.isEmpty()) { - collector.getTimelineEntityContext().setFlowId(flowId); + String flowName = response.getFlowName(); + if (flowName != null && !flowName.isEmpty()) { + collector.getTimelineEntityContext().setFlowName(flowName); } - String flowRunId = response.getFlowRunId(); - if (flowRunId != null && !flowRunId.isEmpty()) { + String flowVersion = response.getFlowVersion(); + if (flowVersion != null && !flowVersion.isEmpty()) { + collector.getTimelineEntityContext().setFlowVersion(flowVersion); + } + long flowRunId = response.getFlowRunId(); + if (flowRunId != 0L) { collector.getTimelineEntityContext().setFlowRunId(flowRunId); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index 41b6ac9..dd8ad06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -65,22 +65,23 @@ public class FileSystemTimelineWriterImpl extends AbstractService @Override public TimelineWriteResponse write(String clusterId, String userId, - String flowId, String flowRunId, String appId, + String flowName, String flowVersion, long flowRunId, String appId, TimelineEntities entities) throws IOException { TimelineWriteResponse response = new TimelineWriteResponse(); for (TimelineEntity entity : entities.getEntities()) { - write(clusterId, userId, flowId, flowRunId, appId, entity, response); + write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity, + response); } return response; } - private void write(String clusterId, String userId, - String flowId, String flowRunId, String appId, TimelineEntity entity, + private void write(String clusterId, String userId, String flowName, + String flowVersion, long flowRun, String appId, TimelineEntity entity, TimelineWriteResponse response) throws IOException { PrintWriter out = null; try { - String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,flowId, - flowRunId, appId, entity.getType()); + String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,flowName, + flowVersion, String.valueOf(flowRun), appId, entity.getType()); String fileName = dir + entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION; out = new PrintWriter(new BufferedWriter(new FileWriter(fileName, true))); out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java index 492e3a9..467bcec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java @@ -41,8 +41,9 @@ public interface TimelineWriter extends Service { * * @param clusterId context cluster ID * @param userId context user ID - * @param flowId context flow ID - * @param flowRunId context flow run ID + * @param flowName context flow name + * @param flowVersion context flow version + * @param flowRunId * @param appId context app ID * @param data * a {@link TimelineEntities} object. @@ -50,7 +51,7 @@ public interface TimelineWriter extends Service { * @throws IOException */ TimelineWriteResponse write(String clusterId, String userId, - String flowId, String flowRunId, String appId, + String flowName, String flowVersion, long flowRunId, String appId, TimelineEntities data) throws IOException; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java index 1de8d6d..abbe13a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java @@ -162,7 +162,7 @@ public class TestPerNodeTimelineCollectorsAuxService { CollectorNodemanagerProtocol nmCollectorService = mock(CollectorNodemanagerProtocol.class); GetTimelineCollectorContextResponse response = - GetTimelineCollectorContextResponse.newInstance(null, null, null); + GetTimelineCollectorContextResponse.newInstance(null, null, null, 0L); try { when(nmCollectorService.getTimelineCollectorContext(any( GetTimelineCollectorContextRequest.class))).thenReturn(response); http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java index 36bda85..c662998 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -146,7 +147,7 @@ public class TestTimelineCollectorManager { CollectorNodemanagerProtocol nmCollectorService = mock(CollectorNodemanagerProtocol.class); GetTimelineCollectorContextResponse response = - GetTimelineCollectorContextResponse.newInstance(null, null, null); + GetTimelineCollectorContextResponse.newInstance(null, null, null, 0L); try { when(nmCollectorService.getTimelineCollectorContext(any( GetTimelineCollectorContextRequest.class))).thenReturn(response); http://git-wip-us.apache.org/repos/asf/hadoop/blob/84389771/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java index 407b5f6..50a9f60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java @@ -57,11 +57,13 @@ public class TestFileSystemTimelineWriterImpl { fsi = new FileSystemTimelineWriterImpl(); fsi.init(new YarnConfiguration()); fsi.start(); - fsi.write("cluster_id", "user_id", "flow_id", "flow_run_id", "app_id", te); + fsi.write("cluster_id", "user_id", "flow_name", "flow_version", 12345678L, + "app_id", te); String fileName = fsi.getOutputRoot() + - "/entities/cluster_id/user_id/flow_id/flow_run_id/app_id/" + type + - "/" + id + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + "/entities/cluster_id/user_id/flow_name/flow_version/12345678/app_id/" + + type + "/" + id + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; Path path = Paths.get(fileName); File f = new File(fileName); assertTrue(f.exists() && !f.isDirectory());