http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index 0a9895e..d0c1198 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -19,17 +19,23 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; @@ -52,6 +58,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { private Set<NodeLabel> labels = null; private List<LogAggregationReport> logAggregationReportsForApps = null; + private Map<ApplicationId, String> registeredCollectors = null; + public NodeHeartbeatRequestPBImpl() { builder = NodeHeartbeatRequestProto.newBuilder(); } @@ -106,6 +114,9 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { if (this.logAggregationReportsForApps != null) { addLogAggregationStatusForAppsToProto(); } + if (this.registeredCollectors != null) { + addRegisteredCollectorsToProto(); + } } private void addLogAggregationStatusForAppsToProto() { @@ -147,6 +158,17 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { return ((LogAggregationReportPBImpl) value).getProto(); } + private void addRegisteredCollectorsToProto() { + maybeInitBuilder(); + builder.clearRegisteredCollectors(); + for (Map.Entry<ApplicationId, String> entry : + registeredCollectors.entrySet()) { + builder.addRegisteredCollectors(AppCollectorsMapProto.newBuilder() + .setAppId(convertToProtoFormat(entry.getKey())) + .setAppCollectorAddr(entry.getValue())); + } + } + private void mergeLocalToProto() { if (viaProto) maybeInitBuilder(); @@ -228,6 +250,38 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { this.lastKnownNMTokenMasterKey = masterKey; } + @Override + public Map<ApplicationId, String> getRegisteredCollectors() { + if (this.registeredCollectors != null) { + return this.registeredCollectors; + } + initRegisteredCollectors(); + return registeredCollectors; + } + + private void initRegisteredCollectors() { + NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; + List<AppCollectorsMapProto> list = p.getRegisteredCollectorsList(); + if (!list.isEmpty()) { + this.registeredCollectors = new HashMap<>(); + for (AppCollectorsMapProto c : list) { + ApplicationId appId = convertFromProtoFormat(c.getAppId()); + this.registeredCollectors.put(appId, c.getAppCollectorAddr()); + } + } + } + + @Override + public void setRegisteredCollectors( + Map<ApplicationId, String> registeredCollectors) { + if (registeredCollectors == null || registeredCollectors.isEmpty()) { + return; + } + maybeInitBuilder(); + this.registeredCollectors = new HashMap<ApplicationId, String>(); + this.registeredCollectors.putAll(registeredCollectors); + } + private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) { return new NodeStatusPBImpl(p); } @@ -236,6 +290,14 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { return ((NodeStatusPBImpl)t).getProto(); } + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) { return new MasterKeyPBImpl(p); }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 3422697..cd85241 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueui import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; @@ -68,6 +69,7 @@ public class NodeHeartbeatResponsePBImpl extends private List<ApplicationId> applicationsToCleanup = null; private Map<ApplicationId, ByteBuffer> systemCredentials = null; private Resource resource = null; + private Map<ApplicationId, String> appCollectorsMap = null; private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; @@ -125,6 +127,9 @@ public class NodeHeartbeatResponsePBImpl extends if (this.resource != null) { builder.setResource(convertToProtoFormat(this.resource)); } + if (this.appCollectorsMap != null) { + addAppCollectorsMapToProto(); + } } private void addSystemCredentialsToProto() { @@ -138,6 +143,16 @@ public class NodeHeartbeatResponsePBImpl extends } } + private void addAppCollectorsMapToProto() { + maybeInitBuilder(); + builder.clearAppCollectorsMap(); + for (Map.Entry<ApplicationId, String> entry : appCollectorsMap.entrySet()) { + builder.addAppCollectorsMap(AppCollectorsMapProto.newBuilder() + .setAppId(convertToProtoFormat(entry.getKey())) + .setAppCollectorAddr(entry.getValue())); + } + } + private void mergeLocalToProto() { if (viaProto) maybeInitBuilder(); @@ -551,6 +566,15 @@ public class NodeHeartbeatResponsePBImpl extends return systemCredentials; } + @Override + public Map<ApplicationId, String> getAppCollectorsMap() { + if (this.appCollectorsMap != null) { + return this.appCollectorsMap; + } + initAppCollectorsMap(); + return appCollectorsMap; + } + private void initSystemCredentials() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; List<SystemCredentialsForAppsProto> list = p.getSystemCredentialsForAppsList(); @@ -562,6 +586,18 @@ public class NodeHeartbeatResponsePBImpl extends } } + private void initAppCollectorsMap() { + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List<AppCollectorsMapProto> list = p.getAppCollectorsMapList(); + if (!list.isEmpty()) { + this.appCollectorsMap = new HashMap<>(); + for (AppCollectorsMapProto c : list) { + ApplicationId appId = convertFromProtoFormat(c.getAppId()); + this.appCollectorsMap.put(appId, c.getAppCollectorAddr()); + } + } + } + @Override public void setSystemCredentialsForApps( Map<ApplicationId, ByteBuffer> systemCredentials) { @@ -574,6 +610,17 @@ public class NodeHeartbeatResponsePBImpl extends } @Override + public void setAppCollectorsMap( + Map<ApplicationId, String> appCollectorsMap) { + if (appCollectorsMap == null || appCollectorsMap.isEmpty()) { + return; + } + maybeInitBuilder(); + this.appCollectorsMap = new HashMap<ApplicationId, String>(); + this.appCollectorsMap.putAll(appCollectorsMap); + } + + @Override public long getNextHeartBeatInterval() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; return (p.getNextHeartBeatInterval()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java new file mode 100644 index 0000000..c6f6619 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; +import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; +import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorsMapPBImpl; + +public class ReportNewCollectorInfoRequestPBImpl extends + ReportNewCollectorInfoRequest { + + private ReportNewCollectorInfoRequestProto proto = + ReportNewCollectorInfoRequestProto.getDefaultInstance(); + + private ReportNewCollectorInfoRequestProto.Builder builder = null; + private boolean viaProto = false; + + private List<AppCollectorsMap> collectorsList = null; + + public ReportNewCollectorInfoRequestPBImpl() { + builder = ReportNewCollectorInfoRequestProto.newBuilder(); + } + + public ReportNewCollectorInfoRequestPBImpl( + ReportNewCollectorInfoRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public ReportNewCollectorInfoRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void mergeLocalToBuilder() { + if (collectorsList != null) { + addLocalCollectorsToProto(); + } + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ReportNewCollectorInfoRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void addLocalCollectorsToProto() { + maybeInitBuilder(); + builder.clearAppCollectors(); + List<AppCollectorsMapProto> protoList = + new ArrayList<AppCollectorsMapProto>(); + for (AppCollectorsMap m : this.collectorsList) { + protoList.add(convertToProtoFormat(m)); + } + builder.addAllAppCollectors(protoList); + } + + private void initLocalCollectorsList() { + ReportNewCollectorInfoRequestProtoOrBuilder p = viaProto ? proto : builder; + List<AppCollectorsMapProto> list = + p.getAppCollectorsList(); + this.collectorsList = new ArrayList<AppCollectorsMap>(); + for (AppCollectorsMapProto m : list) { + this.collectorsList.add(convertFromProtoFormat(m)); + } + } + + @Override + public List<AppCollectorsMap> getAppCollectorsList() { + if (this.collectorsList == null) { + initLocalCollectorsList(); + } + return this.collectorsList; + } + + @Override + public void setAppCollectorsList(List<AppCollectorsMap> appCollectorsList) { + maybeInitBuilder(); + if (appCollectorsList == null) { + builder.clearAppCollectors(); + } + this.collectorsList = appCollectorsList; + } + + private AppCollectorsMapPBImpl convertFromProtoFormat( + AppCollectorsMapProto p) { + return new AppCollectorsMapPBImpl(p); + } + + private AppCollectorsMapProto convertToProtoFormat( + AppCollectorsMap m) { + return ((AppCollectorsMapPBImpl) m).getProto(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoResponsePBImpl.java new file mode 100644 index 0000000..5f2a10a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoResponsePBImpl.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoResponseProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class ReportNewCollectorInfoResponsePBImpl extends + ReportNewCollectorInfoResponse { + + private ReportNewCollectorInfoResponseProto proto = + ReportNewCollectorInfoResponseProto.getDefaultInstance(); + + private ReportNewCollectorInfoResponseProto.Builder builder = null; + + private boolean viaProto = false; + + public ReportNewCollectorInfoResponsePBImpl() { + builder = ReportNewCollectorInfoResponseProto.newBuilder(); + } + + public ReportNewCollectorInfoResponsePBImpl( + ReportNewCollectorInfoResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public ReportNewCollectorInfoResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java new file mode 100644 index 0000000..07e1d92 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.Records; + + +@Private +public abstract class AppCollectorsMap { + + public static AppCollectorsMap newInstance( + ApplicationId id, String collectorAddr) { + AppCollectorsMap appCollectorsMap = + Records.newRecord(AppCollectorsMap.class); + appCollectorsMap.setApplicationId(id); + appCollectorsMap.setCollectorAddr(collectorAddr); + return appCollectorsMap; + } + + public abstract ApplicationId getApplicationId(); + + public abstract void setApplicationId(ApplicationId id); + + public abstract String getCollectorAddr(); + + public abstract void setCollectorAddr(String addr); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java new file mode 100644 index 0000000..3740035 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java @@ -0,0 +1,152 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; + +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class AppCollectorsMapPBImpl extends AppCollectorsMap { + + private AppCollectorsMapProto proto = + AppCollectorsMapProto.getDefaultInstance(); + + private AppCollectorsMapProto.Builder builder = null; + private boolean viaProto = false; + + private ApplicationId appId = null; + private String collectorAddr = null; + + public AppCollectorsMapPBImpl() { + builder = AppCollectorsMapProto.newBuilder(); + } + + public AppCollectorsMapPBImpl(AppCollectorsMapProto proto) { + this.proto = proto; + viaProto = true; + } + + public AppCollectorsMapProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public ApplicationId getApplicationId() { + AppCollectorsMapProtoOrBuilder p = viaProto ? proto : builder; + if (this.appId == null && p.hasAppId()) { + this.appId = convertFromProtoFormat(p.getAppId()); + } + return this.appId; + } + + @Override + public String getCollectorAddr() { + AppCollectorsMapProtoOrBuilder p = viaProto ? proto : builder; + if (this.collectorAddr == null + && p.hasAppCollectorAddr()) { + this.collectorAddr = p.getAppCollectorAddr(); + } + return this.collectorAddr; + } + + @Override + public void setApplicationId(ApplicationId id) { + maybeInitBuilder(); + if (id == null) { + builder.clearAppId(); + } + this.appId = id; + } + + @Override + public void setCollectorAddr(String collectorAddr) { + maybeInitBuilder(); + if (collectorAddr == null) { + builder.clearAppCollectorAddr(); + } + this.collectorAddr = collectorAddr; + } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = AppCollectorsMapProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void mergeLocalToBuilder() { + if (this.appId != null) { + builder.setAppId(convertToProtoFormat(this.appId)); + } + if (this.collectorAddr != null) { + builder.setAppCollectorAddr(this.collectorAddr); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java index 0d5540d..eadb5b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java @@ -27,10 +27,20 @@ public class ContainerMetricsConstants { public static final String ENTITY_TYPE = "YARN_CONTAINER"; + // Event of this type will be emitted by NM. public static final String CREATED_EVENT_TYPE = "YARN_CONTAINER_CREATED"; + // Event of this type will be emitted by RM. + public static final String CREATED_IN_RM_EVENT_TYPE = + "YARN_RM_CONTAINER_CREATED"; + + // Event of this type will be emitted by NM. public static final String FINISHED_EVENT_TYPE = "YARN_CONTAINER_FINISHED"; + // Event of this type will be emitted by RM. + public static final String FINISHED_IN_RM_EVENT_TYPE = + "YARN_RM_CONTAINER_FINISHED"; + public static final String PARENT_PRIMARIY_FILTER = "YARN_CONTAINER_PARENT"; public static final String ALLOCATED_MEMORY_ENTITY_INFO = @@ -59,4 +69,12 @@ public class ContainerMetricsConstants { public static final String ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO = "YARN_CONTAINER_ALLOCATED_HOST_HTTP_ADDRESS"; + + // Event of this type will be emitted by NM. + public static final String LOCALIZATION_START_EVENT_TYPE = + "YARN_NM_CONTAINER_LOCALIZATION_STARTED"; + + // Event of this type will be emitted by NM. + public static final String LOCALIZATION_FINISHED_EVENT_TYPE = + "YARN_NM_CONTAINER_LOCALIZATION_FINISHED"; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto new file mode 100644 index 0000000..8665274 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "CollectorNodemanagerProtocol"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +import "yarn_server_common_service_protos.proto"; + +service CollectorNodemanagerProtocolService { + rpc reportNewCollectorInfo (ReportNewCollectorInfoRequestProto) returns (ReportNewCollectorInfoResponseProto); + rpc getTimelineCollectorContext (GetTimelineCollectorContextRequestProto) returns (GetTimelineCollectorContextResponseProto); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 3e3cb82..3660252 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -84,6 +84,7 @@ message NodeHeartbeatRequestProto { optional MasterKeyProto last_known_nm_token_master_key = 3; optional NodeLabelsProto nodeLabels = 4; repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5; + repeated AppCollectorsMapProto registered_collectors = 6; } message LogAggregationReportProto { @@ -108,6 +109,7 @@ message NodeHeartbeatResponseProto { repeated SignalContainerRequestProto containers_to_signal = 13; optional ResourceProto resource = 14; optional ContainerQueuingLimitProto container_queuing_limit = 15; + repeated AppCollectorsMapProto app_collectors_map = 16; } message ContainerQueuingLimitProto { @@ -120,6 +122,35 @@ message SystemCredentialsForAppsProto { optional bytes credentialsForApp = 2; } +//////////////////////////////////////////////////////////////////////// +////// From collector_nodemanager_protocol //////////////////////////// +//////////////////////////////////////////////////////////////////////// +message AppCollectorsMapProto { + optional ApplicationIdProto appId = 1; + optional string appCollectorAddr = 2; +} + +////////////////////////////////////////////////////// +/////// collector_nodemanager_protocol ////////////// +////////////////////////////////////////////////////// +message ReportNewCollectorInfoRequestProto { + repeated AppCollectorsMapProto app_collectors = 1; +} + +message ReportNewCollectorInfoResponseProto { +} + +message GetTimelineCollectorContextRequestProto { + optional ApplicationIdProto appId = 1; +} + +message GetTimelineCollectorContextResponseProto { + optional string user_id = 1; + optional string flow_name = 2; + optional string flow_version = 3; + optional int64 flow_run_id = 4; +} + message NMContainerStatusProto { optional ContainerIdProto container_id = 1; optional ContainerStateProto container_state = 2; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java new file mode 100644 index 0000000..e25f528 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -0,0 +1,416 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; +import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.junit.Test; + +public class TestRPC { + + private static final String EXCEPTION_MSG = "test error"; + private static final String EXCEPTION_CAUSE = "exception cause"; + private static final RecordFactory RECORD_FACTORY = + RecordFactoryProvider.getRecordFactory(null); + + public static final String ILLEGAL_NUMBER_MESSAGE = + "collectors' number in ReportNewCollectorInfoRequest is not ONE."; + + public static final String DEFAULT_COLLECTOR_ADDR = "localhost:0"; + + public static final ApplicationId DEFAULT_APP_ID = + ApplicationId.newInstance(0, 0); + + @Test + public void testUnknownCall() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class + .getName()); + YarnRPC rpc = YarnRPC.create(conf); + String bindAddr = "localhost:0"; + InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); + Server server = rpc.getServer(ContainerManagementProtocol.class, + new DummyContainerManager(), addr, conf, null, 1); + server.start(); + + // Any unrelated protocol would do + ApplicationClientProtocol proxy = (ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, NetUtils.getConnectAddress(server), + conf); + + try { + proxy.getNewApplication(Records + .newRecord(GetNewApplicationRequest.class)); + Assert.fail("Excepted RPC call to fail with unknown method."); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().matches( + "Unknown method getNewApplication called on.*" + + "org.apache.hadoop.yarn.proto.ApplicationClientProtocol" + + "\\$ApplicationClientProtocolService\\$BlockingInterface " + + "protocol.")); + } catch (Exception e) { + e.printStackTrace(); + } finally { + server.stop(); + } + } + + @Test + public void testRPCOnCollectorNodeManagerProtocol() throws IOException { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class + .getName()); + YarnRPC rpc = YarnRPC.create(conf); + String bindAddr = "localhost:0"; + InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); + Server server = rpc.getServer(CollectorNodemanagerProtocol.class, + new DummyNMCollectorService(), addr, conf, null, 1); + server.start(); + + // Test unrelated protocol wouldn't get response + ApplicationClientProtocol unknownProxy = + (ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, NetUtils.getConnectAddress(server), + conf); + + try { + unknownProxy.getNewApplication(Records + .newRecord(GetNewApplicationRequest.class)); + Assert.fail("Excepted RPC call to fail with unknown method."); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().matches( + "Unknown method getNewApplication called on.*" + + "org.apache.hadoop.yarn.proto.ApplicationClientProtocol" + + "\\$ApplicationClientProtocolService\\$BlockingInterface " + + "protocol.")); + } catch (Exception e) { + e.printStackTrace(); + } + + // Test CollectorNodemanagerProtocol get proper response + CollectorNodemanagerProtocol proxy = + (CollectorNodemanagerProtocol)rpc.getProxy( + CollectorNodemanagerProtocol.class, NetUtils.getConnectAddress(server), + conf); + // Verify request with DEFAULT_APP_ID and DEFAULT_COLLECTOR_ADDR get + // normally response. + try { + ReportNewCollectorInfoRequest request = + ReportNewCollectorInfoRequest.newInstance( + DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR); + proxy.reportNewCollectorInfo(request); + } catch (YarnException e) { + Assert.fail("RPC call failured is not expected here."); + } + + // Verify empty request get YarnException back (by design in + // DummyNMCollectorService) + try { + proxy.reportNewCollectorInfo(Records + .newRecord(ReportNewCollectorInfoRequest.class)); + Assert.fail("Excepted RPC call to fail with YarnException."); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().contains(ILLEGAL_NUMBER_MESSAGE)); + } + + // Verify request with a valid app ID + try { + GetTimelineCollectorContextRequest request = + GetTimelineCollectorContextRequest.newInstance( + ApplicationId.newInstance(0, 1)); + GetTimelineCollectorContextResponse response = + proxy.getTimelineCollectorContext(request); + Assert.assertEquals("test_user_id", response.getUserId()); + Assert.assertEquals("test_flow_name", response.getFlowName()); + Assert.assertEquals("test_flow_version", response.getFlowVersion()); + Assert.assertEquals(12345678L, response.getFlowRunId()); + } catch (YarnException | IOException e) { + Assert.fail("RPC call failured is not expected here."); + } + + // Verify request with an invalid app ID + try { + GetTimelineCollectorContextRequest request = + GetTimelineCollectorContextRequest.newInstance( + ApplicationId.newInstance(0, 2)); + proxy.getTimelineCollectorContext(request); + Assert.fail("RPC call failured is expected here."); + } catch (YarnException | IOException e) { + Assert.assertTrue(e instanceof YarnException); + Assert.assertTrue(e.getMessage().contains( + "The application is not found.")); + } + server.stop(); + } + + @Test + public void testHadoopProtoRPC() throws Exception { + test(HadoopYarnProtoRPC.class.getName()); + } + + private void test(String rpcClass) throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.IPC_RPC_IMPL, rpcClass); + YarnRPC rpc = YarnRPC.create(conf); + String bindAddr = "localhost:0"; + InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); + Server server = rpc.getServer(ContainerManagementProtocol.class, + new DummyContainerManager(), addr, conf, null, 1); + server.start(); + RPC.setProtocolEngine(conf, ContainerManagementProtocolPB.class, + ProtobufRpcEngine.class); + ContainerManagementProtocol proxy = (ContainerManagementProtocol) + rpc.getProxy(ContainerManagementProtocol.class, + NetUtils.getConnectAddress(server), conf); + ContainerLaunchContext containerLaunchContext = + RECORD_FACTORY.newRecordInstance(ContainerLaunchContext.class); + + ApplicationId applicationId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId applicationAttemptId = + ApplicationAttemptId.newInstance(applicationId, 0); + ContainerId containerId = + ContainerId.newContainerId(applicationAttemptId, 100); + NodeId nodeId = NodeId.newInstance("localhost", 1234); + Resource resource = Resource.newInstance(1234, 2); + ContainerTokenIdentifier containerTokenIdentifier = + new ContainerTokenIdentifier(containerId, "localhost", "user", + resource, System.currentTimeMillis() + 10000, 42, 42, + Priority.newInstance(0), 0); + Token containerToken = newContainerToken(nodeId, "password".getBytes(), + containerTokenIdentifier); + + StartContainerRequest scRequest = + StartContainerRequest.newInstance(containerLaunchContext, + containerToken); + List<StartContainerRequest> list = new ArrayList<StartContainerRequest>(); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + proxy.startContainers(allRequests); + + List<ContainerId> containerIds = new ArrayList<ContainerId>(); + containerIds.add(containerId); + GetContainerStatusesRequest gcsRequest = + GetContainerStatusesRequest.newInstance(containerIds); + GetContainerStatusesResponse response = + proxy.getContainerStatuses(gcsRequest); + List<ContainerStatus> statuses = response.getContainerStatuses(); + + //test remote exception + boolean exception = false; + try { + StopContainersRequest stopRequest = + RECORD_FACTORY.newRecordInstance(StopContainersRequest.class); + stopRequest.setContainerIds(containerIds); + proxy.stopContainers(stopRequest); + } catch (YarnException e) { + exception = true; + Assert.assertTrue(e.getMessage().contains(EXCEPTION_MSG)); + Assert.assertTrue(e.getMessage().contains(EXCEPTION_CAUSE)); + System.out.println("Test Exception is " + e.getMessage()); + } catch (Exception ex) { + ex.printStackTrace(); + } finally { + server.stop(); + } + Assert.assertTrue(exception); + Assert.assertNotNull(statuses.get(0)); + Assert.assertEquals(ContainerState.RUNNING, statuses.get(0).getState()); + } + + public class DummyContainerManager implements ContainerManagementProtocol { + + private List<ContainerStatus> statuses = new ArrayList<ContainerStatus>(); + + @Override + public GetContainerStatusesResponse getContainerStatuses( + GetContainerStatusesRequest request) + throws YarnException { + GetContainerStatusesResponse response = + RECORD_FACTORY.newRecordInstance(GetContainerStatusesResponse.class); + response.setContainerStatuses(statuses); + return response; + } + + @Override + public StartContainersResponse startContainers( + StartContainersRequest requests) throws YarnException { + StartContainersResponse response = + RECORD_FACTORY.newRecordInstance(StartContainersResponse.class); + for (StartContainerRequest request : + requests.getStartContainerRequests()) { + Token containerToken = request.getContainerToken(); + ContainerTokenIdentifier tokenId = null; + + try { + tokenId = newContainerTokenIdentifier(containerToken); + } catch (IOException e) { + throw RPCUtil.getRemoteException(e); + } + ContainerStatus status = + RECORD_FACTORY.newRecordInstance(ContainerStatus.class); + status.setState(ContainerState.RUNNING); + status.setContainerId(tokenId.getContainerID()); + status.setExitStatus(0); + statuses.add(status); + + } + return response; + } + + @Override + public StopContainersResponse stopContainers(StopContainersRequest request) + throws YarnException { + Exception e = new Exception(EXCEPTION_MSG, + new Exception(EXCEPTION_CAUSE)); + throw new YarnException(e); + } + + @Override + public IncreaseContainersResourceResponse increaseContainersResource( + IncreaseContainersResourceRequest request) + throws YarnException, IOException { + return null; + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException { + final Exception e = new Exception(EXCEPTION_MSG, + new Exception(EXCEPTION_CAUSE)); + throw new YarnException(e); + } + } + + public static ContainerTokenIdentifier newContainerTokenIdentifier( + Token containerToken) throws IOException { + org.apache.hadoop.security.token.Token<ContainerTokenIdentifier> token = + new org.apache.hadoop.security.token.Token<ContainerTokenIdentifier>( + containerToken.getIdentifier() + .array(), containerToken.getPassword().array(), new Text( + containerToken.getKind()), + new Text(containerToken.getService())); + return token.decodeIdentifier(); + } + + public static Token newContainerToken(NodeId nodeId, byte[] password, + ContainerTokenIdentifier tokenIdentifier) { + // RPC layer client expects ip:port as service for tokens + InetSocketAddress addr = + NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort()); + // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token + Token containerToken = + Token.newInstance(tokenIdentifier.getBytes(), + ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil + .buildTokenService(addr).toString()); + return containerToken; + } + + // A dummy implementation for CollectorNodemanagerProtocol for test purpose, + // it only can accept one appID, collectorAddr pair or throw exceptions + public class DummyNMCollectorService + implements CollectorNodemanagerProtocol { + + @Override + public ReportNewCollectorInfoResponse reportNewCollectorInfo( + ReportNewCollectorInfoRequest request) + throws YarnException, IOException { + List<AppCollectorsMap> appCollectors = request.getAppCollectorsList(); + if (appCollectors.size() == 1) { + // check default appID and collectorAddr + AppCollectorsMap appCollector = appCollectors.get(0); + Assert.assertEquals(appCollector.getApplicationId(), + DEFAULT_APP_ID); + Assert.assertEquals(appCollector.getCollectorAddr(), + DEFAULT_COLLECTOR_ADDR); + } else { + throw new YarnException(ILLEGAL_NUMBER_MESSAGE); + } + + ReportNewCollectorInfoResponse response = + RECORD_FACTORY.newRecordInstance( + ReportNewCollectorInfoResponse.class); + return response; + } + + @Override + public GetTimelineCollectorContextResponse getTimelineCollectorContext( + GetTimelineCollectorContextRequest request) + throws YarnException, IOException { + if (request.getApplicationId().getId() == 1) { + return GetTimelineCollectorContextResponse.newInstance( + "test_user_id", "test_flow_name", "test_flow_version", 12345678L); + } else { + throw new YarnException("The application is not found."); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index 59278ef..be350e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -25,7 +25,9 @@ import static org.junit.Assert.assertTrue; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -107,11 +109,14 @@ public class TestYarnServerApiClasses { original.setLastKnownNMTokenMasterKey(getMasterKey()); original.setNodeStatus(getNodeStatus()); original.setNodeLabels(getValidNodeLabels()); + Map<ApplicationId, String> collectors = getCollectors(); + original.setRegisteredCollectors(collectors); NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl( original.getProto()); assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId()); assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost()); + assertEquals(collectors, copy.getRegisteredCollectors()); // check labels are coming with valid values Assert.assertTrue(original.getNodeLabels() .containsAll(copy.getNodeLabels())); @@ -148,6 +153,8 @@ public class TestYarnServerApiClasses { original.setNextHeartBeatInterval(1000); original.setNodeAction(NodeAction.NORMAL); original.setResponseId(100); + Map<ApplicationId, String> collectors = getCollectors(); + original.setAppCollectorsMap(collectors); NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl( original.getProto()); @@ -157,6 +164,7 @@ public class TestYarnServerApiClasses { assertEquals(1, copy.getContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getNMTokenMasterKey().getKeyId()); assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage()); + assertEquals(collectors, copy.getAppCollectorsMap()); assertEquals(false, copy.getAreNodeLabelsAcceptedByRM()); } @@ -336,6 +344,15 @@ public class TestYarnServerApiClasses { return nodeLabels; } + private Map<ApplicationId, String> getCollectors() { + ApplicationId appID = ApplicationId.newInstance(1L, 1); + String collectorAddr = "localhost:0"; + Map<ApplicationId, String> collectorMap = + new HashMap<ApplicationId, String>(); + collectorMap.put(appID, collectorAddr); + return collectorMap; + } + private ContainerStatus getContainerStatus(int applicationId, int containerID, int appAttemptId) { ContainerStatus status = recordFactory http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index cfcf1bd..131eaa3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -36,6 +37,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; /** @@ -71,6 +73,13 @@ public interface Context { Map<ApplicationId, Credentials> getSystemCredentialsForApps(); + /** + * Get the registered collectors that located on this NM. + * @return registered collectors, or null if the timeline service v.2 is not + * enabled + */ + Map<ApplicationId, String> getRegisteredCollectors(); + ConcurrentMap<ContainerId, Container> getContainers(); ConcurrentMap<ContainerId, org.apache.hadoop.yarn.api.records.Container> @@ -94,6 +103,8 @@ public interface Context { boolean getDecommissioned(); + Configuration getConf(); + void setDecommissioned(boolean isDecommissioned); ConcurrentLinkedQueue<LogAggregationReport> @@ -111,4 +122,8 @@ public interface Context { boolean isDistributedSchedulingEnabled(); OpportunisticContainerAllocator getContainerAllocator(); + + void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher); + + NMTimelinePublisher getNMTimelinePublisher(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index f7d226e..5bfbb8d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; +import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -98,6 +100,8 @@ public class NodeManager extends CompositeService private Context context; private AsyncDispatcher dispatcher; private ContainerManagerImpl containerManager; + // the NM collector service is set only if the timeline service v.2 is enabled + private NMCollectorService nmCollectorService; private NodeStatusUpdater nodeStatusUpdater; private NodeResourceMonitor nodeResourceMonitor; private static CompositeServiceShutdownHook nodeManagerShutdownHook; @@ -183,6 +187,10 @@ public class NodeManager extends CompositeService } } + protected NMCollectorService createNMCollectorService(Context ctxt) { + return new NMCollectorService(ctxt); + } + protected WebServer createWebServer(Context nmContext, ResourceView resourceView, ApplicationACLsManager aclsManager, LocalDirsHandlerService dirsHandler) { @@ -196,9 +204,10 @@ public class NodeManager extends CompositeService protected NMContext createNMContext( NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, - NMStateStoreService stateStore, boolean isDistSchedulerEnabled) { + NMStateStoreService stateStore, boolean isDistSchedulerEnabled, + Configuration conf) { return new NMContext(containerTokenSecretManager, nmTokenSecretManager, - dirsHandler, aclsManager, stateStore, isDistSchedulerEnabled); + dirsHandler, aclsManager, stateStore, isDistSchedulerEnabled, conf); } protected void doSecureLogin() throws IOException { @@ -331,7 +340,7 @@ public class NodeManager extends CompositeService YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT); this.context = createNMContext(containerTokenSecretManager, - nmTokenSecretManager, nmStore, isDistSchedulingEnabled); + nmTokenSecretManager, nmStore, isDistSchedulingEnabled, conf); nodeLabelsProvider = createNodeLabelsProvider(conf); @@ -374,6 +383,11 @@ public class NodeManager extends CompositeService DefaultMetricsSystem.initialize("NodeManager"); + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + this.nmCollectorService = createNMCollectorService(context); + addService(nmCollectorService); + } + // StatusUpdater should be added last so that it get started last // so that we make sure everything is up before registering with RM. addService(nodeStatusUpdater); @@ -457,6 +471,9 @@ public class NodeManager extends CompositeService public static class NMContext implements Context { private NodeId nodeId = null; + + private Configuration conf = null; + protected final ConcurrentMap<ApplicationId, Application> applications = new ConcurrentHashMap<ApplicationId, Application>(); @@ -466,6 +483,8 @@ public class NodeManager extends CompositeService protected final ConcurrentMap<ContainerId, Container> containers = new ConcurrentSkipListMap<ContainerId, Container>(); + private Map<ApplicationId, String> registeredCollectors; + protected final ConcurrentMap<ContainerId, org.apache.hadoop.yarn.api.records.Container> increasedContainers = new ConcurrentHashMap<>(); @@ -490,10 +509,16 @@ public class NodeManager extends CompositeService private final QueuingContext queuingContext; + private NMTimelinePublisher nmTimelinePublisher; + public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager, - NMStateStoreService stateStore, boolean isDistSchedulingEnabled) { + NMStateStoreService stateStore, boolean isDistSchedulingEnabled, + Configuration conf) { + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + this.registeredCollectors = new ConcurrentHashMap<>(); + } this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; this.dirsHandler = dirsHandler; @@ -506,6 +531,7 @@ public class NodeManager extends CompositeService LogAggregationReport>(); this.queuingContext = new QueuingNMContext(); this.isDistSchedulingEnabled = isDistSchedulingEnabled; + this.conf = conf; } /** @@ -527,6 +553,11 @@ public class NodeManager extends CompositeService } @Override + public Configuration getConf() { + return this.conf; + } + + @Override public ConcurrentMap<ContainerId, Container> getContainers() { return this.containers; } @@ -645,6 +676,31 @@ public class NodeManager extends CompositeService public OpportunisticContainerAllocator getContainerAllocator() { return containerAllocator; } + + @Override + public Map<ApplicationId, String> getRegisteredCollectors() { + return this.registeredCollectors; + } + + public void addRegisteredCollectors( + Map<ApplicationId, String> newRegisteredCollectors) { + if (registeredCollectors != null) { + this.registeredCollectors.putAll(newRegisteredCollectors); + } else { + LOG.warn("collectors are added when the registered collectors are " + + "initialized"); + } + } + + @Override + public void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher) { + this.nmTimelinePublisher = nmMetricsPublisher; + } + + @Override + public NMTimelinePublisher getNMTimelinePublisher() { + return nmTimelinePublisher; + } } /** @@ -744,9 +800,22 @@ public class NodeManager extends CompositeService return this.context; } + /** + * Returns the NM collector service. It should be used only for testing + * purposes. + * + * @return the NM collector service, or null if the timeline service v.2 is + * not enabled + */ + @VisibleForTesting + NMCollectorService getNMCollectorService() { + return this.nmCollectorService; + } + public static void main(String[] args) throws IOException { Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); StringUtils.startupShutdownMessage(NodeManager.class, args, LOG); + @SuppressWarnings("resource") NodeManager nodeManager = new NodeManager(); Configuration conf = new YarnConfiguration(); new GenericOptionsParser(conf, args); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 7064fa3..e4a7903 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -81,11 +81,13 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -813,7 +815,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements .getContainerTokenSecretManager().getCurrentKey(), NodeStatusUpdaterImpl.this.context .getNMTokenSecretManager().getCurrentKey(), - nodeLabelsForHeartbeat); + nodeLabelsForHeartbeat, + NodeStatusUpdaterImpl.this.context.getRegisteredCollectors()); if (logAggregationEnabled) { // pull log aggregation status for application running in this NM @@ -905,6 +908,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements newResource.toString()); } } + if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) { + updateTimelineClientsAddress(response); + } + } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( @@ -932,6 +939,44 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } } + + private void updateTimelineClientsAddress( + NodeHeartbeatResponse response) { + Map<ApplicationId, String> knownCollectorsMap = + response.getAppCollectorsMap(); + if (knownCollectorsMap == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("No collectors to update RM"); + } + } else { + Set<Map.Entry<ApplicationId, String>> rmKnownCollectors = + knownCollectorsMap.entrySet(); + for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) { + ApplicationId appId = entry.getKey(); + String collectorAddr = entry.getValue(); + + // Only handle applications running on local node. + // Not include apps with timeline collectors running in local + Application application = context.getApplications().get(appId); + // TODO this logic could be problematic if the collector address + // gets updated due to NM restart or collector service failure + if (application != null && + !context.getRegisteredCollectors().containsKey(appId)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Sync a new collector address: " + collectorAddr + + " for application: " + appId + " from RM."); + } + NMTimelinePublisher nmTimelinePublisher = + context.getNMTimelinePublisher(); + if (nmTimelinePublisher != null) { + nmTimelinePublisher.setTimelineServiceAddress( + application.getAppId(), collectorAddr); + } + } + } + } + } + private void updateMasterKeys(NodeHeartbeatResponse response) { // See if the master-key has rolled over MasterKey updatedMasterKey = response.getContainerTokenMasterKey(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java new file mode 100644 index 0000000..d667c0e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.collectormanager; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; +import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; + +/** + * Service that handles collector information. It is used only if the timeline + * service v.2 is enabled. + */ +public class NMCollectorService extends CompositeService implements + CollectorNodemanagerProtocol { + + private static final Log LOG = LogFactory.getLog(NMCollectorService.class); + + private final Context context; + + private Server server; + + public NMCollectorService(Context context) { + super(NMCollectorService.class.getName()); + this.context = context; + } + + @Override + protected void serviceStart() throws Exception { + Configuration conf = getConfig(); + + InetSocketAddress collectorServerAddress = conf.getSocketAddr( + YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT); + + Configuration serverConf = new Configuration(conf); + + // TODO Security settings. + YarnRPC rpc = YarnRPC.create(conf); + + server = + rpc.getServer(CollectorNodemanagerProtocol.class, this, + collectorServerAddress, serverConf, + this.context.getNMTokenSecretManager(), + conf.getInt(YarnConfiguration.NM_COLLECTOR_SERVICE_THREAD_COUNT, + YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_THREAD_COUNT)); + + server.start(); + collectorServerAddress = conf.updateConnectAddr( + YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS, + server.getListenerAddress()); + // start remaining services + super.serviceStart(); + LOG.info("NMCollectorService started at " + collectorServerAddress); + } + + + @Override + public void serviceStop() throws Exception { + if (server != null) { + server.stop(); + } + // TODO may cleanup app collectors running on this NM in future. + super.serviceStop(); + } + + @Override + public ReportNewCollectorInfoResponse reportNewCollectorInfo( + ReportNewCollectorInfoRequest request) throws YarnException, IOException { + List<AppCollectorsMap> newCollectorsList = request.getAppCollectorsList(); + if (newCollectorsList != null && !newCollectorsList.isEmpty()) { + Map<ApplicationId, String> newCollectorsMap = + new HashMap<ApplicationId, String>(); + for (AppCollectorsMap collector : newCollectorsList) { + ApplicationId appId = collector.getApplicationId(); + String collectorAddr = collector.getCollectorAddr(); + newCollectorsMap.put(appId, collectorAddr); + // set registered collector address to TimelineClient. + NMTimelinePublisher nmTimelinePublisher = + context.getNMTimelinePublisher(); + if (nmTimelinePublisher != null) { + nmTimelinePublisher.setTimelineServiceAddress(appId, collectorAddr); + } + } + ((NodeManager.NMContext)context).addRegisteredCollectors( + newCollectorsMap); + } + + return ReportNewCollectorInfoResponse.newInstance(); + } + + @Override + public GetTimelineCollectorContextResponse getTimelineCollectorContext( + GetTimelineCollectorContextRequest request) + throws YarnException, IOException { + Application app = context.getApplications().get(request.getApplicationId()); + if (app == null) { + throw new YarnException("Application " + request.getApplicationId() + + " doesn't exist on NM."); + } + return GetTimelineCollectorContextResponse.newInstance( + app.getUser(), app.getFlowName(), app.getFlowVersion(), + app.getFlowRunId()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/package-info.java new file mode 100644 index 0000000..7bf597b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.nodemanager.collectormanager contains + * classes for handling timeline collector information. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.nodemanager.collectormanager; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org