This is an automated email from the ASF dual-hosted git repository. jlowe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 6a92346 YARN-6523. Optimize system credentials sent in node heartbeat responses. Contributed by Manikandan R 6a92346 is described below commit 6a923464afa6b635f505be5d5b2414d47d570f99 Author: Jason Lowe <jl...@apache.org> AuthorDate: Tue Jan 8 16:51:06 2019 -0600 YARN-6523. Optimize system credentials sent in node heartbeat responses. Contributed by Manikandan R --- .../yarn/api/records/impl/pb/ProtoUtils.java | 18 +++ .../api/protocolrecords/NodeHeartbeatRequest.java | 4 + .../api/protocolrecords/NodeHeartbeatResponse.java | 21 +-- .../impl/pb/NodeHeartbeatRequestPBImpl.java | 13 ++ .../impl/pb/NodeHeartbeatResponsePBImpl.java | 91 ++++-------- .../yarn/server/utils/YarnServerBuilderUtils.java | 68 +++++++++ .../proto/yarn_server_common_service_protos.proto | 2 + .../hadoop/yarn/TestYarnServerApiClasses.java | 55 +++++++- .../api/protocolrecords/TestProtocolRecords.java | 21 +-- .../server/nodemanager/NodeStatusUpdaterImpl.java | 9 +- .../server/nodemanager/TestNodeStatusUpdater.java | 32 ++++- .../resourcemanager/RMActiveServiceContext.java | 29 +++- .../yarn/server/resourcemanager/RMContext.java | 11 +- .../yarn/server/resourcemanager/RMContextImpl.java | 15 +- .../resourcemanager/ResourceTrackerService.java | 33 ++++- .../security/DelegationTokenRenewer.java | 17 ++- .../hadoop/yarn/server/resourcemanager/MockNM.java | 3 + .../yarn/server/resourcemanager/MockNodes.java | 47 +++++- .../TestResourceTrackerService.java | 157 +++++++++++++++++++++ .../security/TestDelegationTokenRenewer.java | 110 +++++++++++++-- 20 files changed, 632 insertions(+), 124 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index 4008a97..f175cf3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.Container; @@ -62,6 +63,7 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TimedPlacementCon import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; @@ -92,12 +94,16 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateTypeProto; import org.apache.hadoop.yarn.server.api.ContainerType; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; import com.google.protobuf.ByteString; @Private @Unstable public class ProtoUtils { + public static final Interner<ByteString> BYTE_STRING_INTERNER = + Interners.newWeakInterner(); /* * ContainerState @@ -578,6 +584,18 @@ public class ProtoUtils { TimedPlacementConstraintProto.DelayUnit u) { return TimedPlacementConstraint.DelayUnit.valueOf(u.name()); } + + /* + * ApplicationId + */ + public static ApplicationIdPBImpl convertFromProtoFormat( + ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + public static ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index 4f99225..1b4339b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -102,4 +102,8 @@ public abstract class NodeHeartbeatRequest { public abstract Set<NodeAttribute> getNodeAttributes(); public abstract void setNodeAttributes(Set<NodeAttribute> nodeAttributes); + + public abstract void setTokenSequenceNo(long tokenSequenceNo); + + public abstract long getTokenSequenceNo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index acb7644..a177cf8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; -import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; import java.util.Map; @@ -28,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; @@ -88,13 +88,6 @@ public abstract class NodeHeartbeatResponse { public abstract void setDiagnosticsMessage(String diagnosticsMessage); - // Credentials (i.e. hdfs tokens) needed by NodeManagers for application - // localizations and logAggreations. - public abstract Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps(); - - public abstract void setSystemCredentialsForApps( - Map<ApplicationId, ByteBuffer> systemCredentials); - public abstract boolean getAreNodeLabelsAcceptedByRM(); public abstract void setAreNodeLabelsAcceptedByRM( @@ -123,4 +116,16 @@ public abstract class NodeHeartbeatResponse { public abstract void setAreNodeAttributesAcceptedByRM( boolean areNodeAttributesAcceptedByRM); + + public abstract void setTokenSequenceNo(long tokenSequenceNo); + + public abstract long getTokenSequenceNo(); + + // Credentials (i.e. hdfs tokens) needed by NodeManagers for application + // localizations and logAggregations. + public abstract void setSystemCredentialsForApps( + Collection<SystemCredentialsForAppsProto> systemCredentials); + + public abstract Collection<SystemCredentialsForAppsProto> + getSystemCredentialsForApps(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index c59127a..7d96c8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -455,4 +455,17 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { } this.logAggregationReportsForApps = logAggregationStatusForApps; } + + @Override + public void setTokenSequenceNo(long tokenSequenceNo) { + maybeInitBuilder(); + this.builder.setTokenSequenceNo(tokenSequenceNo); + } + + @Override + public long getTokenSequenceNo() { + NodeHeartbeatRequestProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getTokenSequenceNo(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index a53ea4d..7fe9bf8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -26,10 +25,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; -import com.google.protobuf.ByteString; - import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; @@ -39,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; @@ -76,7 +70,6 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { private List<ContainerId> containersToCleanup = null; private List<ContainerId> containersToBeRemovedFromNM = null; private List<ApplicationId> applicationsToCleanup = null; - private Map<ApplicationId, ByteBuffer> systemCredentials = null; private Resource resource = null; private Map<ApplicationId, AppCollectorData> appCollectorsMap = null; @@ -88,9 +81,6 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { private List<Container> containersToDecrease = null; private List<SignalContainerRequest> containersToSignal = null; - private static final Interner<ByteString> BYTE_STRING_INTERNER = - Interners.newWeakInterner(); - public NodeHeartbeatResponsePBImpl() { builder = NodeHeartbeatResponseProto.newBuilder(); } @@ -129,9 +119,6 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { builder.setContainerQueuingLimit( convertToProtoFormat(this.containerQueuingLimit)); } - if (this.systemCredentials != null) { - addSystemCredentialsToProto(); - } if (this.containersToUpdate != null) { addContainersToUpdateToProto(); } @@ -149,17 +136,6 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { } } - private void addSystemCredentialsToProto() { - maybeInitBuilder(); - builder.clearSystemCredentialsForApps(); - for (Map.Entry<ApplicationId, ByteBuffer> entry : systemCredentials.entrySet()) { - builder.addSystemCredentialsForApps(SystemCredentialsForAppsProto.newBuilder() - .setAppId(convertToProtoFormat(entry.getKey())) - .setCredentialsForApp(BYTE_STRING_INTERNER.intern( - ProtoUtils.convertToProtoFormat(entry.getValue().duplicate())))); - } - } - private void addAppCollectorsMapToProto() { maybeInitBuilder(); builder.clearAppCollectors(); @@ -168,7 +144,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { AppCollectorData data = entry.getValue(); AppCollectorDataProto.Builder appCollectorDataBuilder = AppCollectorDataProto.newBuilder() - .setAppId(convertToProtoFormat(entry.getKey())) + .setAppId(ProtoUtils.convertToProtoFormat(entry.getKey())) .setAppCollectorAddr(data.getCollectorAddr()) .setRmIdentifier(data.getRMIdentifier()) .setVersion(data.getVersion()); @@ -477,7 +453,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { this.applicationsToCleanup = new ArrayList<ApplicationId>(); for (ApplicationIdProto c : list) { - this.applicationsToCleanup.add(convertFromProtoFormat(c)); + this.applicationsToCleanup.add(ProtoUtils.convertFromProtoFormat(c)); } } @@ -510,7 +486,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { @Override public ApplicationIdProto next() { - return convertToProtoFormat(iter.next()); + return ProtoUtils.convertToProtoFormat(iter.next()); } @Override @@ -645,15 +621,6 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { } @Override - public Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps() { - if (this.systemCredentials != null) { - return this.systemCredentials; - } - initSystemCredentials(); - return systemCredentials; - } - - @Override public Map<ApplicationId, AppCollectorData> getAppCollectors() { if (this.appCollectorsMap != null) { return this.appCollectorsMap; @@ -662,24 +629,13 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { return appCollectorsMap; } - private void initSystemCredentials() { - NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; - List<SystemCredentialsForAppsProto> list = p.getSystemCredentialsForAppsList(); - this.systemCredentials = new HashMap<ApplicationId, ByteBuffer> (); - for (SystemCredentialsForAppsProto c : list) { - ApplicationId appId = convertFromProtoFormat(c.getAppId()); - ByteBuffer byteBuffer = ProtoUtils.convertFromProtoFormat(c.getCredentialsForApp()); - this.systemCredentials.put(appId, byteBuffer); - } - } - private void initAppCollectorsMap() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; List<AppCollectorDataProto> list = p.getAppCollectorsList(); if (!list.isEmpty()) { this.appCollectorsMap = new HashMap<>(); for (AppCollectorDataProto c : list) { - ApplicationId appId = convertFromProtoFormat(c.getAppId()); + ApplicationId appId = ProtoUtils.convertFromProtoFormat(c.getAppId()); Token collectorToken = null; if (c.hasAppCollectorToken()){ collectorToken = convertFromProtoFormat(c.getAppCollectorToken()); @@ -694,13 +650,19 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { @Override public void setSystemCredentialsForApps( - Map<ApplicationId, ByteBuffer> systemCredentials) { - if (systemCredentials == null || systemCredentials.isEmpty()) { - return; - } + Collection<SystemCredentialsForAppsProto> systemCredentialsForAppsProto) { maybeInitBuilder(); - this.systemCredentials = new HashMap<ApplicationId, ByteBuffer>(); - this.systemCredentials.putAll(systemCredentials); + builder.clearSystemCredentialsForApps(); + if (systemCredentialsForAppsProto != null) { + builder.addAllSystemCredentialsForApps(systemCredentialsForAppsProto); + } + } + + @Override + public Collection<SystemCredentialsForAppsProto> + getSystemCredentialsForApps() { + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + return p.getSystemCredentialsForAppsList(); } @Override @@ -742,14 +704,6 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { return ProtoUtils.convertToProtoFormat(t); } - private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { - return new ApplicationIdPBImpl(p); - } - - private ApplicationIdProto convertToProtoFormat(ApplicationId t) { - return ((ApplicationIdPBImpl) t).getProto(); - } - private NodeAction convertFromProtoFormat(NodeActionProto p) { return NodeAction.valueOf(p.name()); } @@ -889,5 +843,18 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { private TokenPBImpl convertFromProtoFormat(TokenProto p) { return new TokenPBImpl(p); } + + @Override + public void setTokenSequenceNo(long tokenSequenceNo) { + maybeInitBuilder(); + this.builder.setTokenSequenceNo(tokenSequenceNo); + } + + @Override + public long getTokenSequenceNo() { + NodeHeartbeatResponseProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getTokenSequenceNo(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java index f333185..9ee68d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java @@ -18,12 +18,19 @@ package org.apache.hadoop.yarn.server.utils; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -66,4 +73,65 @@ public class YarnServerBuilderUtils { } return response; } + + /** + * Build SystemCredentialsForAppsProto objects. + * + * @param applicationId Application ID + * @param credentials HDFS Tokens + * @return systemCredentialsForAppsProto SystemCredentialsForAppsProto + */ + public static SystemCredentialsForAppsProto newSystemCredentialsForAppsProto( + ApplicationId applicationId, ByteBuffer credentials) { + SystemCredentialsForAppsProto systemCredentialsForAppsProto = + SystemCredentialsForAppsProto.newBuilder() + .setAppId(ProtoUtils.convertToProtoFormat(applicationId)) + .setCredentialsForApp(ProtoUtils.BYTE_STRING_INTERNER.intern( + ProtoUtils.convertToProtoFormat(credentials.duplicate()))) + .build(); + return systemCredentialsForAppsProto; + } + + /** + * Convert Collection of SystemCredentialsForAppsProto proto objects to a Map + * of ApplicationId to ByteBuffer. + * + * @param systemCredentials List of SystemCredentialsForAppsProto proto + * objects + * @return systemCredentialsForApps Map of Application Id to ByteBuffer + */ + public static Map<ApplicationId, ByteBuffer> convertFromProtoFormat( + Collection<SystemCredentialsForAppsProto> systemCredentials) { + + Map<ApplicationId, ByteBuffer> systemCredentialsForApps = + new HashMap<ApplicationId, ByteBuffer>(systemCredentials.size()); + for (SystemCredentialsForAppsProto proto : systemCredentials) { + systemCredentialsForApps.put( + ProtoUtils.convertFromProtoFormat(proto.getAppId()), + ProtoUtils.convertFromProtoFormat(proto.getCredentialsForApp())); + } + return systemCredentialsForApps; + } + + /** + * Convert Map of Application Id to ByteBuffer to Collection of + * SystemCredentialsForAppsProto proto objects. + * + * @param systemCredentialsForApps Map of Application Id to ByteBuffer + * @return systemCredentials List of SystemCredentialsForAppsProto proto + * objects + */ + public static List<SystemCredentialsForAppsProto> convertToProtoFormat( + Map<ApplicationId, ByteBuffer> systemCredentialsForApps) { + List<SystemCredentialsForAppsProto> systemCredentials = + new ArrayList<SystemCredentialsForAppsProto>( + systemCredentialsForApps.size()); + for (Map.Entry<ApplicationId, ByteBuffer> entry : systemCredentialsForApps + .entrySet()) { + SystemCredentialsForAppsProto proto = + newSystemCredentialsForAppsProto(entry.getKey(), entry.getValue()); + systemCredentials.add(proto); + } + return systemCredentials; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index fb74237..b5a99b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -102,6 +102,7 @@ message NodeHeartbeatRequestProto { repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5; repeated AppCollectorDataProto registering_collectors = 6; optional NodeAttributesProto nodeAttributes = 7; + optional int64 tokenSequenceNo = 8; } message LogAggregationReportProto { @@ -131,6 +132,7 @@ message NodeHeartbeatResponseProto { // to be used in place of containers_to_decrease repeated ContainerProto containers_to_update = 17; optional bool areNodeAttributesAcceptedByRM = 18 [default = false]; + optional int64 tokenSequenceNo = 19; } message ContainerQueuingLimitProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index 8b1d0bb..eb9bce4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -29,6 +30,12 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -37,7 +44,6 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; @@ -56,6 +62,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.junit.Assert; import org.junit.Test; @@ -153,10 +160,12 @@ public class TestYarnServerApiClasses { /** * Test NodeHeartbeatResponsePBImpl. + * + * @throws IOException */ @Test - public void testNodeHeartbeatResponsePBImpl() { + public void testNodeHeartbeatResponsePBImpl() throws IOException { NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl(); original.setDiagnosticsMessage("testDiagnosticMessage"); @@ -168,6 +177,29 @@ public class TestYarnServerApiClasses { Map<ApplicationId, AppCollectorData> collectors = getCollectors(false); original.setAppCollectors(collectors); + // create token1 + Text userText1 = new Text("user1"); + DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(userText1, + new Text("renewer1"), userText1); + final Token<DelegationTokenIdentifier> expectedToken1 = + new Token<DelegationTokenIdentifier>(dtId1.getBytes(), + "password12".getBytes(), dtId1.getKind(), new Text("service1")); + + Credentials credentials1 = new Credentials(); + credentials1.addToken(expectedToken1.getService(), expectedToken1); + + DataOutputBuffer dob1 = new DataOutputBuffer(); + credentials1.writeTokenStorageToStream(dob1); + + ByteBuffer byteBuffer1 = + ByteBuffer.wrap(dob1.getData(), 0, dob1.getLength()); + + Map<ApplicationId, ByteBuffer> systemCredentials = + new HashMap<ApplicationId, ByteBuffer>(); + systemCredentials.put(getApplicationId(1), byteBuffer1); + original.setSystemCredentialsForApps( + YarnServerBuilderUtils.convertToProtoFormat(systemCredentials)); + NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl( original.getProto()); assertEquals(100, copy.getResponseId()); @@ -178,6 +210,22 @@ public class TestYarnServerApiClasses { assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage()); assertEquals(collectors, copy.getAppCollectors()); assertEquals(false, copy.getAreNodeLabelsAcceptedByRM()); + assertEquals(1, copy.getSystemCredentialsForApps().size()); + + Credentials credentials1Out = new Credentials(); + DataInputByteBuffer buf = new DataInputByteBuffer(); + ByteBuffer buffer = + YarnServerBuilderUtils + .convertFromProtoFormat(copy.getSystemCredentialsForApps()) + .get(getApplicationId(1)); + Assert.assertNotNull(buffer); + buffer.rewind(); + buf.reset(buffer); + credentials1Out.readTokenStorageStream(buf); + assertEquals(1, credentials1Out.getAllTokens().size()); + // Ensure token1's password "password12" is available from proto response + assertEquals(10, + credentials1Out.getAllTokens().iterator().next().getPassword().length); } @Test @@ -376,7 +424,8 @@ public class TestYarnServerApiClasses { AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr); if (!hasNullCollectorToken) { data.setCollectorToken( - Token.newInstance(new byte[0], "kind", new byte[0], "s")); + org.apache.hadoop.yarn.api.records.Token.newInstance(new byte[0], + "kind", new byte[0], "s")); } Map<ApplicationId, AppCollectorData> collectorMap = new HashMap<>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java index e6e79d3..473557f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import com.google.common.collect.Sets; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; @@ -38,28 +37,29 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.NodeAttribute; -import org.apache.hadoop.yarn.api.records.NodeAttributeType; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl; - import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb .NodeHeartbeatRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; - import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; +import com.google.common.collect.Sets; + public class TestProtocolRecords { @Test @@ -154,14 +154,17 @@ public class TestProtocolRecords { DataOutputBuffer dob = new DataOutputBuffer(); app1Cred.writeTokenStorageToStream(dob); - ByteBuffer byteBuffer1 = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1); - record.setSystemCredentialsForApps(appCredentials); + ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + + appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer); + record.setSystemCredentialsForApps( + YarnServerBuilderUtils.convertToProtoFormat(appCredentials)); NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl( ((NodeHeartbeatResponsePBImpl) record).getProto()); - Assert.assertEquals(appCredentials, proto.getSystemCredentialsForApps()); + Assert.assertEquals(appCredentials, YarnServerBuilderUtils + .convertFromProtoFormat(proto.getSystemCredentialsForApps())); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 5975784..3fbe0c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvid import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.hadoop.yarn.util.resource.Resources; @@ -158,6 +159,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private NMNodeAttributesHandler nodeAttributesHandler; private NodeLabelsProvider nodeLabelsProvider; private NodeAttributesProvider nodeAttributesProvider; + private long tokenSequenceNo; public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { @@ -1320,6 +1322,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } } + request.setTokenSequenceNo( + NodeStatusUpdaterImpl.this.tokenSequenceNo); response = resourceTracker.nodeHeartbeat(request); //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); @@ -1360,7 +1364,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); } Map<ApplicationId, ByteBuffer> systemCredentials = - response.getSystemCredentialsForApps(); + YarnServerBuilderUtils.convertFromProtoFormat( + response.getSystemCredentialsForApps()); if (systemCredentials != null && !systemCredentials.isEmpty()) { ((NMContext) context).setSystemCrendentialsForApps( parseCredentials(systemCredentials)); @@ -1404,6 +1409,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements updateTimelineCollectorData(response); } + NodeStatusUpdaterImpl.this.tokenSequenceNo = + response.getTokenSequenceNo(); } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 8435340..5ba0bef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.nodemanager; import static org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils.newNodeHeartbeatResponse; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.EOFException; @@ -79,6 +78,8 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl; +import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceTracker; @@ -112,6 +113,8 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; @SuppressWarnings("rawtypes") public class TestNodeStatusUpdater extends NodeManagerTestBase { @@ -325,16 +328,28 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase { public MyNodeStatusUpdater2(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { super(context, dispatcher, healthChecker, metrics); - resourceTracker = new MyResourceTracker4(context); + InetSocketAddress address = new InetSocketAddress(0); + Configuration configuration = new Configuration(); + Server server = RpcServerFactoryPBImpl.get().getServer( + ResourceTracker.class, new MyResourceTracker4(context), address, + configuration, null, 1); + server.start(); + this.resourceTracker = (ResourceTracker) RpcClientFactoryPBImpl.get() + .getClient( + ResourceTracker.class, 1, NetUtils.getConnectAddress(server), + configuration); } @Override - protected ResourceTracker getRMClient() { + protected ResourceTracker getRMClient() throws IOException { return resourceTracker; } @Override protected void stopRMProxy() { + if (this.resourceTracker != null) { + RPC.stopProxy(this.resourceTracker); + } return; } } @@ -780,7 +795,8 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase { ByteBuffer byteBuffer1 = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1); - nhResponse.setSystemCredentialsForApps(appCredentials); + nhResponse.setSystemCredentialsForApps( + YarnServerBuilderUtils.convertToProtoFormat(appCredentials)); return nhResponse; } @@ -1702,7 +1718,8 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase { @Test public void testConcurrentAccessToSystemCredentials(){ - final Map<ApplicationId, ByteBuffer> testCredentials = new HashMap<>(); + final Map<ApplicationId, ByteBuffer> testCredentials = + new HashMap<>(); ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[300]); ApplicationId applicationId = ApplicationId.newInstance(123456, 120); testCredentials.put(applicationId, byteBuffer); @@ -1727,8 +1744,9 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase { NodeHeartbeatResponse nodeHeartBeatResponse = newNodeHeartbeatResponse(0, NodeAction.NORMAL, null, null, null, null, 0); - nodeHeartBeatResponse.setSystemCredentialsForApps( - testCredentials); + nodeHeartBeatResponse + .setSystemCredentialsForApps(YarnServerBuilderUtils + .convertToProtoFormat(testCredentials)); NodeHeartbeatResponseProto proto = ((NodeHeartbeatResponsePBImpl)nodeHeartBeatResponse) .getProto(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index f829a4c..bcedb03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -18,9 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; @@ -81,8 +82,8 @@ public class RMActiveServiceContext { private final ConcurrentMap<NodeId, RMNode> inactiveNodes = new ConcurrentHashMap<NodeId, RMNode>(); - private final ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials = - new ConcurrentHashMap<ApplicationId, ByteBuffer>(); + private final ConcurrentMap<ApplicationId, SystemCredentialsForAppsProto> systemCredentials = + new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>(); private boolean isWorkPreservingRecoveryEnabled; @@ -124,6 +125,8 @@ public class RMActiveServiceContext { private ProxyCAManager proxyCAManager; private VolumeManager volumeManager; + private AtomicLong tokenSequenceNo = new AtomicLong(1); + public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); } @@ -509,7 +512,8 @@ public class RMActiveServiceContext { @Private @Unstable - public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() { + public ConcurrentMap<ApplicationId, SystemCredentialsForAppsProto> + getSystemCredentialsForApps() { return systemCredentials; } @@ -583,4 +587,21 @@ public class RMActiveServiceContext { public void setVolumeManager(VolumeManager volumeManager) { this.volumeManager = volumeManager; } + + /** + * Get token sequence no. + * + * @return the tokenSequenceNo + */ + public Long getTokenSequenceNo() { + return tokenSequenceNo.get(); + } + + /** + * Increment token sequence no. + * + */ + public void incrTokenSequenceNo() { + this.tokenSequenceNo.incrementAndGet(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 4e9846c..55420bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; @@ -29,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -72,8 +72,9 @@ public interface RMContext extends ApplicationMasterServiceContext { RMStateStore getStateStore(); ConcurrentMap<ApplicationId, RMApp> getRMApps(); - - ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps(); + + ConcurrentMap<ApplicationId, SystemCredentialsForAppsProto> + getSystemCredentialsForApps(); ConcurrentMap<NodeId, RMNode> getInactiveRMNodes(); @@ -198,4 +199,8 @@ public interface RMContext extends ApplicationMasterServiceContext { VolumeManager getVolumeManager(); void setVolumeManager(VolumeManager volumeManager); + + long getTokenSequenceNo(); + + void incrTokenSequenceNo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index ab71134..716e06f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.net.URI; import java.net.URISyntaxException; -import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; @@ -35,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -572,7 +572,8 @@ public class RMContextImpl implements RMContext { activeServiceContext.setSystemClock(clock); } - public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() { + public ConcurrentMap<ApplicationId, SystemCredentialsForAppsProto> + getSystemCredentialsForApps() { return activeServiceContext.getSystemCredentialsForApps(); } @@ -666,4 +667,14 @@ public class RMContextImpl implements RMContext { public NodeAttributesManager getNodeAttributesManager() { return activeServiceContext.getNodeAttributesManager(); } + + @Override + public long getTokenSequenceNo() { + return this.activeServiceContext.getTokenSequenceNo(); + } + + @Override + public void incrTokenSequenceNo() { + this.activeServiceContext.incrTokenSequenceNo(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 6f669c8..c50950b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -21,13 +21,11 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; @@ -639,11 +637,7 @@ public class ResourceTrackerService extends AbstractService implements populateKeys(request, nodeHeartBeatResponse); - ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials = - rmContext.getSystemCredentialsForApps(); - if (!systemCredentials.isEmpty()) { - nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); - } + populateTokenSequenceNo(request, nodeHeartBeatResponse); if (timelineV2Enabled) { // Return collectors' map that NM needs to know @@ -952,4 +946,29 @@ public class ResourceTrackerService extends AbstractService implements public Server getServer() { return this.server; } + + private void populateTokenSequenceNo(NodeHeartbeatRequest request, + NodeHeartbeatResponse nodeHeartBeatResponse) { + if (LOG.isDebugEnabled()) { + LOG.debug("Token sequence no received from heartbeat request: " + + request.getTokenSequenceNo() + ". Current token sequeunce no: " + + this.rmContext.getTokenSequenceNo() + + ". System credentials for apps size: " + + rmContext.getSystemCredentialsForApps().size()); + } + if(request.getTokenSequenceNo() != this.rmContext.getTokenSequenceNo()) { + if (!rmContext.getSystemCredentialsForApps().isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Sending System credentials for apps as part of NodeHeartbeat " + + "response."); + } + nodeHeartBeatResponse + .setSystemCredentialsForApps( + rmContext.getSystemCredentialsForApps().values()); + } + } + nodeHeartBeatResponse.setTokenSequenceNo( + this.rmContext.getTokenSequenceNo()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index a9f8cd1..de8c30b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -64,10 +64,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -637,7 +639,7 @@ public class DelegationTokenRenewer extends AbstractService { // Request new hdfs token if the token is about to expire, and remove the old // token from the tokenToRenew list - private void requestNewHdfsDelegationTokenIfNeeded( + void requestNewHdfsDelegationTokenIfNeeded( final DelegationTokenToRenew dttr) throws IOException, InterruptedException { @@ -679,6 +681,7 @@ public class DelegationTokenRenewer extends AbstractService { Collection<ApplicationId> referringAppIds, String user, boolean shouldCancelAtEnd) throws IOException, InterruptedException { + boolean incrTokenSequenceNo = false; if (!hasProxyUserPrivileges) { LOG.info("RM proxy-user privilege is not enabled. Skip requesting hdfs tokens."); return; @@ -703,14 +706,24 @@ public class DelegationTokenRenewer extends AbstractService { appTokens.get(applicationId).add(tokenToRenew); } LOG.info("Received new token " + token); + incrTokenSequenceNo = true; } } } + + if(incrTokenSequenceNo) { + this.rmContext.incrTokenSequenceNo(); + } + DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); for (ApplicationId applicationId : referringAppIds) { - rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer); + SystemCredentialsForAppsProto systemCredentialsForAppsProto = + YarnServerBuilderUtils.newSystemCredentialsForAppsProto(applicationId, + byteBuffer); + rmContext.getSystemCredentialsForApps().put(applicationId, + systemCredentialsForAppsProto); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 2e28395..fe3a889 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -68,6 +68,7 @@ public class MockNM { private Map<ApplicationId, AppCollectorData> registeringCollectors = new ConcurrentHashMap<>(); private Set<NodeLabel> nodeLabels; + private long tokenSequenceNo; public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { // scale vcores based on the requested memory @@ -278,6 +279,7 @@ public class MockNM { req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey); req.setRegisteringCollectors(this.registeringCollectors); + req.setTokenSequenceNo(this.tokenSequenceNo); NodeHeartbeatResponse heartbeatResponse = resourceTracker.nodeHeartbeat(req); @@ -302,6 +304,7 @@ public class MockNM { capability = Resources.clone(newResource); } + this.tokenSequenceNo = heartbeatResponse.getTokenSequenceNo(); return heartbeatResponse; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index a871993..c0af041 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -119,11 +119,13 @@ public class MockNodes { private ResourceUtilization containersUtilization; private ResourceUtilization nodeUtilization; private Resource physicalResource; + private RMContext rmContext; - public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, + MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, Resource perNode, String rackName, String healthReport, - long lastHealthReportTime, int cmdPort, String hostName, NodeState state, - Set<String> labels, ResourceUtilization containersUtilization, + long lastHealthReportTime, int cmdPort, String hostName, + NodeState state, Set<String> labels, + ResourceUtilization containersUtilization, ResourceUtilization nodeUtilization, Resource pPhysicalResource) { this.nodeId = nodeId; this.nodeAddr = nodeAddr; @@ -141,6 +143,18 @@ public class MockNodes { this.physicalResource = pPhysicalResource; } + public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, + Resource perNode, String rackName, String healthReport, + long lastHealthReportTime, int cmdPort, String hostName, + NodeState state, Set<String> labels, + ResourceUtilization containersUtilization, + ResourceUtilization nodeUtilization, Resource pPhysicalResource, + RMContext rmContext) { + this(nodeId, nodeAddr, httpAddress, perNode, rackName, healthReport, + lastHealthReportTime, cmdPort, hostName, state, labels, + containersUtilization, nodeUtilization, pPhysicalResource); + this.rmContext = rmContext; + } @Override public NodeId getNodeID() { return this.nodeId; @@ -298,7 +312,7 @@ public class MockNodes { @Override public RMContext getRMContext() { - return null; + return this.rmContext; } @Override @@ -343,6 +357,26 @@ public class MockNodes { containersUtilization, nodeUtilization, physicalResource); } + private static RMNode buildRMNode(int rack, final Resource perNode, + NodeState state, String httpAddr, int hostnum, String hostName, int port, + Set<String> labels, ResourceUtilization containersUtilization, + ResourceUtilization nodeUtilization, Resource physicalResource, + RMContext rmContext) { + final String rackName = "rack" + rack; + final int nid = hostnum; + final String nodeAddr = hostName + ":" + nid; + if (hostName == null) { + hostName = "host" + nid; + } + final NodeId nodeID = NodeId.newInstance(hostName, port); + + final String httpAddress = httpAddr; + String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe"; + return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, rackName, + healthReport, 0, nid, hostName, state, labels, containersUtilization, + nodeUtilization, physicalResource, rmContext); + } + public static RMNode nodeInfo(int rack, final Resource perNode, NodeState state) { return buildRMNode(rack, perNode, state, "N/A"); @@ -371,4 +405,9 @@ public class MockNodes { return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", hostnum, hostName, port); } + public static RMNode newNodeInfo(int rack, final Resource perNode, + int hostnum, String hostName, int port, RMContext rmContext) { + return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", hostnum, + hostName, port, null, null, null, null, rmContext); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index d3db0d3..78828f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -21,12 +21,17 @@ package org.apache.hadoop.yarn.server.resourcemanager; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.hadoop.net.ServerSocketUtil; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore; import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ServerRMProxy; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -37,6 +42,7 @@ import static org.mockito.Mockito.when; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -47,6 +53,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.HashSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -59,10 +67,13 @@ import javax.xml.transform.OutputKeys; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.Text; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; @@ -83,9 +94,13 @@ import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.AttributeValue; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -105,6 +120,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; @@ -112,10 +128,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.junit.After; @@ -2891,4 +2911,141 @@ public class TestResourceTrackerService extends NodeLabelTestBase { public void close() { } } + + @Test(timeout = 5000) + public void testSystemCredentialsAfterTokenSequenceNoChange() + throws Exception { + + Configuration conf = new Configuration(); + + RMContext rmContext = mock(RMContextImpl.class); + + Dispatcher dispatcher = new InlineDispatcher(); + when(rmContext.getDispatcher()).thenReturn(dispatcher); + + NodeId nodeId = NodeId.newInstance("localhost", 1234); + ConcurrentMap<NodeId, RMNode> rmNodes = + new ConcurrentHashMap<NodeId, RMNode>(); + RMNode rmNode = MockNodes.newNodeInfo(1, Resource.newInstance(1024, 1), 1, + "localhost", 1234, rmContext); + rmNodes.put(nodeId, rmNode); + when(rmContext.getRMNodes()).thenReturn(rmNodes); + + ConcurrentMap<NodeId, RMNode> inactiveNodes = + new ConcurrentHashMap<NodeId, RMNode>(); + when(rmContext.getInactiveRMNodes()).thenReturn(inactiveNodes); + when(rmContext.getConfigurationProvider()) + .thenReturn(new LocalConfigurationProvider()); + + dispatcher.register(SchedulerEventType.class, + new InlineDispatcher.EmptyEventHandler()); + dispatcher.register(RMNodeEventType.class, + new NodeEventDispatcher(rmContext)); + + NMLivelinessMonitor nmLivelinessMonitor = + new NMLivelinessMonitor(dispatcher); + nmLivelinessMonitor.init(conf); + nmLivelinessMonitor.start(); + NodesListManager nodesListManager = new NodesListManager(rmContext); + nodesListManager.init(conf); + RMContainerTokenSecretManager containerTokenSecretManager = + new RMContainerTokenSecretManager(conf); + containerTokenSecretManager.start(); + NMTokenSecretManagerInRM nmTokenSecretManager = + new NMTokenSecretManagerInRM(conf); + nmTokenSecretManager.start(); + ResourceTrackerService resourceTrackerService = new ResourceTrackerService( + rmContext, nodesListManager, nmLivelinessMonitor, + containerTokenSecretManager, nmTokenSecretManager); + + resourceTrackerService.init(conf); + resourceTrackerService.start(); + + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + + RegisterNodeManagerRequest request = + recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); + request.setNodeId(nodeId); + request.setHttpPort(1234); + request.setResource(BuilderUtils.newResource(1024, 1)); + resourceTrackerService.registerNodeManager(request); + + org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus = + recordFactory.newRecordInstance( + org.apache.hadoop.yarn.server.api.records.NodeStatus.class); + nodeStatus.setNodeId(nodeId); + nodeStatus.setResponseId(0); + nodeStatus.setNodeHealthStatus( + recordFactory.newRecordInstance(NodeHealthStatus.class)); + nodeStatus.getNodeHealthStatus().setIsNodeHealthy(true); + + NodeHeartbeatRequest request1 = + recordFactory.newRecordInstance(NodeHeartbeatRequest.class); + request1.setNodeStatus(nodeStatus); + + // Set NM's token sequence no as 1 + request1.setTokenSequenceNo(1); + + // Set RM's token sequence no as 1 + when(rmContext.getTokenSequenceNo()).thenReturn((long) 1); + + // Populate SystemCredentialsForApps + final ApplicationId appId = ApplicationId.newInstance(1234, 1); + Credentials app1Cred = new Credentials(); + + Token<DelegationTokenIdentifier> token = + new Token<DelegationTokenIdentifier>(); + token.setKind(new Text("kind1")); + app1Cred.addToken(new Text("token1"), token); + Token<DelegationTokenIdentifier> token2 = + new Token<DelegationTokenIdentifier>(); + token2.setKind(new Text("kind2")); + app1Cred.addToken(new Text("token2"), token2); + + DataOutputBuffer dob = new DataOutputBuffer(); + app1Cred.writeTokenStorageToStream(dob); + ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + + SystemCredentialsForAppsProto systemCredentialsForAppsProto = + YarnServerBuilderUtils.newSystemCredentialsForAppsProto(appId, + byteBuffer); + + ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto> systemCredentialsForApps = + new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>(1); + + systemCredentialsForApps.put(appId, systemCredentialsForAppsProto); + + when(rmContext.getSystemCredentialsForApps()) + .thenReturn(systemCredentialsForApps); + + // first ping + NodeHeartbeatResponse response = + resourceTrackerService.nodeHeartbeat(request1); + + // Though SystemCredentialsForApps size is 1, it is not being sent as part + // of response as there is no difference between NM's and RM's token + // sequence no + assertEquals(1, rmContext.getTokenSequenceNo()); + assertEquals(1, rmContext.getSystemCredentialsForApps().size()); + assertEquals(1, response.getTokenSequenceNo()); + assertEquals(0, response.getSystemCredentialsForApps().size()); + + // Set RM's token sequence no as 2 + when(rmContext.getTokenSequenceNo()).thenReturn((long) 2); + + // Ensure new heartbeat has been sent to avoid duplicate issues + nodeStatus.setResponseId(1); + request1.setNodeStatus(nodeStatus); + + // second ping + NodeHeartbeatResponse response1 = + resourceTrackerService.nodeHeartbeat(request1); + + // Since NM's and RM's token sequence no is different, response should + // contain SystemCredentialsForApps + assertEquals(2, response1.getTokenSequenceNo()); + assertEquals(1, response1.getSystemCredentialsForApps().size()); + + resourceTrackerService.close(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 9b2c0b3..1a35563 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -35,6 +35,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.concurrent.BlockingQueue; @@ -80,7 +81,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -95,12 +98,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer.DelegationTokenToRenew; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -146,8 +151,13 @@ public class TestDelegationTokenRenewer { @Override public long renew(Token<?> t, Configuration conf) throws IOException { if ( !(t instanceof MyToken)) { - // renew in 3 seconds - return System.currentTimeMillis() + 3000; + if(conf.get("override_token_expire_time") != null) { + return System.currentTimeMillis() + + Long.parseLong(conf.get("override_token_expire_time")); + } else { + // renew in 3 seconds + return System.currentTimeMillis() + 3000; + } } MyToken token = (MyToken)t; if(token.isCanceled()) { @@ -201,6 +211,7 @@ public class TestDelegationTokenRenewer { counter = new AtomicInteger(0); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + conf.set("override_token_expire_time", "3000"); UserGroupInformation.setConfiguration(conf); eventQueue = new LinkedBlockingQueue<Event>(); dispatcher = new AsyncDispatcher(eventQueue); @@ -209,7 +220,7 @@ public class TestDelegationTokenRenewer { RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( - new ConcurrentHashMap<ApplicationId, ByteBuffer>()); + new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>()); when(mockContext.getDelegationTokenRenewer()).thenReturn( delegationTokenRenewer); when(mockContext.getDispatcher()).thenReturn(dispatcher); @@ -581,7 +592,7 @@ public class TestDelegationTokenRenewer { createNewDelegationTokenRenewer(lconf, counter); RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( - new ConcurrentHashMap<ApplicationId, ByteBuffer>()); + new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>()); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); when(mockContext.getDelegationTokenRenewer()).thenReturn( @@ -661,7 +672,7 @@ public class TestDelegationTokenRenewer { createNewDelegationTokenRenewer(conf, counter); RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( - new ConcurrentHashMap<ApplicationId, ByteBuffer>()); + new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>()); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); when(mockContext.getDelegationTokenRenewer()).thenReturn( @@ -766,7 +777,7 @@ public class TestDelegationTokenRenewer { createNewDelegationTokenRenewer(conf, counter); RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( - new ConcurrentHashMap<ApplicationId, ByteBuffer>()); + new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>()); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); InetSocketAddress sockAddr = @@ -825,7 +836,7 @@ public class TestDelegationTokenRenewer { createNewDelegationTokenRenewer(conf, counter); RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( - new ConcurrentHashMap<ApplicationId, ByteBuffer>()); + new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>()); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); InetSocketAddress sockAddr = @@ -890,7 +901,7 @@ public class TestDelegationTokenRenewer { } - @Test (timeout = 20000) + @Test(timeout = 30000) public void testReplaceExpiringDelegationToken() throws Exception { conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, @@ -969,8 +980,14 @@ public class TestDelegationTokenRenewer { new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); nm1.registerNode(); NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + + NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl( + ((NodeHeartbeatResponsePBImpl) response).getProto()); + ByteBuffer tokenBuffer = - response.getSystemCredentialsForApps().get(app.getApplicationId()); + YarnServerBuilderUtils + .convertFromProtoFormat(proto.getSystemCredentialsForApps()) + .get(app.getApplicationId()); Assert.assertNotNull(tokenBuffer); Credentials appCredentials = new Credentials(); DataInputByteBuffer buf = new DataInputByteBuffer(); @@ -1062,8 +1079,14 @@ public class TestDelegationTokenRenewer { new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); nm1.registerNode(); NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + + NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl( + ((NodeHeartbeatResponsePBImpl) response).getProto()); + ByteBuffer tokenBuffer = - response.getSystemCredentialsForApps().get(app.getApplicationId()); + YarnServerBuilderUtils + .convertFromProtoFormat(proto.getSystemCredentialsForApps()) + .get(app.getApplicationId()); Assert.assertNotNull(tokenBuffer); Credentials appCredentials = new Credentials(); DataInputByteBuffer buf = new DataInputByteBuffer(); @@ -1117,8 +1140,14 @@ public class TestDelegationTokenRenewer { new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); nm1.registerNode(); NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + + NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl( + ((NodeHeartbeatResponsePBImpl) response).getProto()); + ByteBuffer tokenBuffer = - response.getSystemCredentialsForApps().get(app.getApplicationId()); + YarnServerBuilderUtils + .convertFromProtoFormat(proto.getSystemCredentialsForApps()) + .get(app.getApplicationId()); Assert.assertNotNull(tokenBuffer); Credentials appCredentials = new Credentials(); DataInputByteBuffer buf = new DataInputByteBuffer(); @@ -1430,7 +1459,7 @@ public class TestDelegationTokenRenewer { DelegationTokenRenewer dtr = createNewDelegationTokenRenewer(conf, counter); RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( - new ConcurrentHashMap<ApplicationId, ByteBuffer>()); + new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>()); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); InetSocketAddress sockAddr = @@ -1444,4 +1473,61 @@ public class TestDelegationTokenRenewer { delegationTokenRenewer.applicationFinished( BuilderUtils.newApplicationId(0, 1)); } + + @Test(timeout = 10000) + public void testTokenSequenceNoAfterNewTokenAndRenewal() throws Exception { + conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + + final Credentials credsx = new Credentials(); + + DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier( + new Text("user1"), new Text("renewer"), new Text("user1")); + final Token<DelegationTokenIdentifier> expectedToken = + new Token<DelegationTokenIdentifier>(dtId1.getBytes(), + "password2".getBytes(), dtId1.getKind(), new Text("service2")); + + // fire up the renewer + final DelegationTokenRenewer dtr = new DelegationTokenRenewer() { + @Override + protected Token<?>[] obtainSystemTokensForUser(String user, + final Credentials credentials) throws IOException { + credentials.addToken(expectedToken.getService(), expectedToken); + return new Token<?>[] {expectedToken}; + } + }; + + RMContext mockContext = mock(RMContext.class); + when(mockContext.getSystemCredentialsForApps()).thenReturn( + new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>()); + ClientRMService mockClientRMService = mock(ClientRMService.class); + when(mockContext.getClientRMService()).thenReturn(mockClientRMService); + InetSocketAddress sockAddr = + InetSocketAddress.createUnresolved("localhost", 1234); + when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); + dtr.setRMContext(mockContext); + when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr); + dtr.init(conf); + dtr.start(); + + final ApplicationId appId1 = ApplicationId.newInstance(1234, 1); + + Collection<ApplicationId> appIds = new ArrayList<ApplicationId>(1); + appIds.add(appId1); + + dtr.addApplicationSync(appId1, credsx, false, "user1"); + + // Ensure incrTokenSequenceNo has been called for new token request + Mockito.verify(mockContext, Mockito.times(1)).incrTokenSequenceNo(); + + DelegationTokenToRenew dttr = new DelegationTokenToRenew(appIds, + expectedToken, conf, 1000, false, "user1"); + + dtr.requestNewHdfsDelegationTokenIfNeeded(dttr); + + // Ensure incrTokenSequenceNo has been called for token renewal as well. + Mockito.verify(mockContext, Mockito.times(2)).incrTokenSequenceNo(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org