http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java new file mode 100644 index 0000000..eb7beef --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest; +import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap; +import org.apache.hadoop.yarn.server.api.records.impl.pb.AppAggregatorsMapPBImpl; + +public class ReportNewAggregatorsInfoRequestPBImpl extends + ReportNewAggregatorsInfoRequest { + + ReportNewAggregatorsInfoRequestProto proto = + ReportNewAggregatorsInfoRequestProto.getDefaultInstance(); + + ReportNewAggregatorsInfoRequestProto.Builder builder = null; + boolean viaProto = false; + + private List<AppAggregatorsMap> aggregatorsList = null; + + public ReportNewAggregatorsInfoRequestPBImpl() { + builder = ReportNewAggregatorsInfoRequestProto.newBuilder(); + } + + public ReportNewAggregatorsInfoRequestPBImpl( + ReportNewAggregatorsInfoRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public ReportNewAggregatorsInfoRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void mergeLocalToBuilder() { + if (aggregatorsList != null) { + addLocalAggregatorsToProto(); + } + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ReportNewAggregatorsInfoRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void addLocalAggregatorsToProto() { + maybeInitBuilder(); + builder.clearAppAggregators(); + List<AppAggregatorsMapProto> protoList = + new ArrayList<AppAggregatorsMapProto>(); + for (AppAggregatorsMap m : this.aggregatorsList) { + protoList.add(convertToProtoFormat(m)); + } + builder.addAllAppAggregators(protoList); + } + + private void initLocalAggregatorsList() { + ReportNewAggregatorsInfoRequestProtoOrBuilder p = viaProto ? proto : builder; + List<AppAggregatorsMapProto> aggregatorsList = + p.getAppAggregatorsList(); + this.aggregatorsList = new ArrayList<AppAggregatorsMap>(); + for (AppAggregatorsMapProto m : aggregatorsList) { + this.aggregatorsList.add(convertFromProtoFormat(m)); + } + } + + @Override + public List<AppAggregatorsMap> getAppAggregatorsList() { + if (this.aggregatorsList == null) { + initLocalAggregatorsList(); + } + return this.aggregatorsList; + } + + @Override + public void setAppAggregatorsList(List<AppAggregatorsMap> appAggregatorsList) { + maybeInitBuilder(); + if (appAggregatorsList == null) { + builder.clearAppAggregators(); + } + this.aggregatorsList = appAggregatorsList; + } + + private AppAggregatorsMapPBImpl convertFromProtoFormat( + AppAggregatorsMapProto p) { + return new AppAggregatorsMapPBImpl(p); + } + + private AppAggregatorsMapProto convertToProtoFormat( + AppAggregatorsMap m) { + return ((AppAggregatorsMapPBImpl) m).getProto(); + } + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java new file mode 100644 index 0000000..0f0925a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoResponseProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class ReportNewAggregatorsInfoResponsePBImpl extends + ReportNewAggregatorsInfoResponse { + + ReportNewAggregatorsInfoResponseProto proto = + ReportNewAggregatorsInfoResponseProto.getDefaultInstance(); + + ReportNewAggregatorsInfoResponseProto.Builder builder = null; + + boolean viaProto = false; + + public ReportNewAggregatorsInfoResponsePBImpl() { + builder = ReportNewAggregatorsInfoResponseProto.newBuilder(); + } + + public ReportNewAggregatorsInfoResponsePBImpl(ReportNewAggregatorsInfoResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public ReportNewAggregatorsInfoResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java new file mode 100644 index 0000000..67c377d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java @@ -0,0 +1,33 @@ +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.Records; + + +@Private +public abstract class AppAggregatorsMap { + + public static AppAggregatorsMap newInstance( + ApplicationId id, String aggregatorAddr) { + AppAggregatorsMap appAggregatorMap = + Records.newRecord(AppAggregatorsMap.class); + appAggregatorMap.setApplicationId(id); + appAggregatorMap.setAggregatorAddr(aggregatorAddr); + return appAggregatorMap; + } + + public abstract ApplicationId getApplicationId(); + + public abstract void setApplicationId( + ApplicationId id); + + public abstract String getAggregatorAddr(); + + public abstract void setAggregatorAddr( + String addr); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java new file mode 100644 index 0000000..32903e2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java @@ -0,0 +1,151 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap; + +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class AppAggregatorsMapPBImpl extends AppAggregatorsMap { + + AppAggregatorsMapProto proto = + AppAggregatorsMapProto.getDefaultInstance(); + + AppAggregatorsMapProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationId appId = null; + private String aggregatorAddr = null; + + public AppAggregatorsMapPBImpl() { + builder = AppAggregatorsMapProto.newBuilder(); + } + + public AppAggregatorsMapPBImpl(AppAggregatorsMapProto proto) { + this.proto = proto; + viaProto = true; + } + + public AppAggregatorsMapProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public ApplicationId getApplicationId() { + AppAggregatorsMapProtoOrBuilder p = viaProto ? proto : builder; + if (this.appId == null && p.hasAppId()) { + this.appId = convertFromProtoFormat(p.getAppId()); + } + return this.appId; + } + + @Override + public String getAggregatorAddr() { + AppAggregatorsMapProtoOrBuilder p = viaProto ? proto : builder; + if (this.aggregatorAddr == null + && p.hasAppAggregatorAddr()) { + this.aggregatorAddr = p.getAppAggregatorAddr(); + } + return this.aggregatorAddr; + } + + @Override + public void setApplicationId(ApplicationId appId) { + maybeInitBuilder(); + if (appId == null) { + builder.clearAppId(); + } + this.appId = appId; + } + + @Override + public void setAggregatorAddr(String aggregatorAddr) { + maybeInitBuilder(); + if (aggregatorAddr == null) { + builder.clearAppAggregatorAddr(); + } + this.aggregatorAddr = aggregatorAddr; + } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = AppAggregatorsMapProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void mergeLocalToBuilder() { + if (this.appId != null) { + builder.setAppId(convertToProtoFormat(this.appId)); + } + if (this.aggregatorAddr != null) { + builder.setAppAggregatorAddr(this.aggregatorAddr); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto new file mode 100644 index 0000000..d7b05c1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "AggregatorNodemanagerProtocol"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +import "yarn_server_common_service_protos.proto"; + +service AggregatorNodemanagerProtocolService { + rpc reportNewAggregatorInfo (ReportNewAggregatorsInfoRequestProto) returns (ReportNewAggregatorsInfoResponseProto); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index c122b2a..6a853f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -62,6 +62,7 @@ message NodeHeartbeatRequestProto { optional MasterKeyProto last_known_nm_token_master_key = 3; optional NodeLabelsProto nodeLabels = 4; repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5; + repeated AppAggregatorsMapProto registered_aggregators = 6; } message LogAggregationReportProto { @@ -82,6 +83,7 @@ message NodeHeartbeatResponseProto { repeated ContainerIdProto containers_to_be_removed_from_nm = 9; repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10; optional bool areNodeLabelsAcceptedByRM = 11 [default = false]; + repeated AppAggregatorsMapProto app_aggregators_map = 12; } message SystemCredentialsForAppsProto { @@ -89,6 +91,25 @@ message SystemCredentialsForAppsProto { optional bytes credentialsForApp = 2; } +//////////////////////////////////////////////////////////////////////// +////// From aggregator_nodemanager_protocol //////////////////////////// +//////////////////////////////////////////////////////////////////////// +message AppAggregatorsMapProto { + optional ApplicationIdProto appId = 1; + optional string appAggregatorAddr = 2; +} + +////////////////////////////////////////////////////// +/////// aggregator_nodemanager_protocol ////////////// +////////////////////////////////////////////////////// +message ReportNewAggregatorsInfoRequestProto { + repeated AppAggregatorsMapProto app_aggregators = 1; +} + +message ReportNewAggregatorsInfoResponseProto { +} + + message NMContainerStatusProto { optional ContainerIdProto container_id = 1; optional ContainerStateProto container_state = 2; http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java new file mode 100644 index 0000000..af9d60f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -0,0 +1,345 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse; +import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.junit.Test; + +public class TestRPC { + + private static final String EXCEPTION_MSG = "test error"; + private static final String EXCEPTION_CAUSE = "exception cause"; + private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + + public static final String ILLEGAL_NUMBER_MESSAGE = + "aggregators' number in ReportNewAggregatorsInfoRequest is not ONE."; + + public static final String DEFAULT_AGGREGATOR_ADDR = "localhost:0"; + + public static final ApplicationId DEFAULT_APP_ID = + ApplicationId.newInstance(0, 0); + + @Test + public void testUnknownCall() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class + .getName()); + YarnRPC rpc = YarnRPC.create(conf); + String bindAddr = "localhost:0"; + InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); + Server server = rpc.getServer(ContainerManagementProtocol.class, + new DummyContainerManager(), addr, conf, null, 1); + server.start(); + + // Any unrelated protocol would do + ApplicationClientProtocol proxy = (ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, NetUtils.getConnectAddress(server), conf); + + try { + proxy.getNewApplication(Records + .newRecord(GetNewApplicationRequest.class)); + Assert.fail("Excepted RPC call to fail with unknown method."); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().matches( + "Unknown method getNewApplication called on.*" + + "org.apache.hadoop.yarn.proto.ApplicationClientProtocol" + + "\\$ApplicationClientProtocolService\\$BlockingInterface protocol.")); + } catch (Exception e) { + e.printStackTrace(); + } finally { + server.stop(); + } + } + + @Test + public void testRPCOnAggregatorNodeManagerProtocol() throws IOException { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class + .getName()); + YarnRPC rpc = YarnRPC.create(conf); + String bindAddr = "localhost:0"; + InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); + Server server = rpc.getServer(AggregatorNodemanagerProtocol.class, + new DummyNMAggregatorService(), addr, conf, null, 1); + server.start(); + + // Test unrelated protocol wouldn't get response + ApplicationClientProtocol unknownProxy = (ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, NetUtils.getConnectAddress(server), conf); + + try { + unknownProxy.getNewApplication(Records + .newRecord(GetNewApplicationRequest.class)); + Assert.fail("Excepted RPC call to fail with unknown method."); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().matches( + "Unknown method getNewApplication called on.*" + + "org.apache.hadoop.yarn.proto.ApplicationClientProtocol" + + "\\$ApplicationClientProtocolService\\$BlockingInterface protocol.")); + } catch (Exception e) { + e.printStackTrace(); + } + + // Test AggregatorNodemanagerProtocol get proper response + AggregatorNodemanagerProtocol proxy = (AggregatorNodemanagerProtocol)rpc.getProxy( + AggregatorNodemanagerProtocol.class, NetUtils.getConnectAddress(server), conf); + // Verify request with DEFAULT_APP_ID and DEFAULT_AGGREGATOR_ADDR get + // normally response. + try { + ReportNewAggregatorsInfoRequest request = + ReportNewAggregatorsInfoRequest.newInstance( + DEFAULT_APP_ID, DEFAULT_AGGREGATOR_ADDR); + proxy.reportNewAggregatorInfo(request); + } catch (YarnException e) { + Assert.fail("RPC call failured is not expected here."); + } + + // Verify empty request get YarnException back (by design in + // DummyNMAggregatorService) + try { + proxy.reportNewAggregatorInfo(Records + .newRecord(ReportNewAggregatorsInfoRequest.class)); + Assert.fail("Excepted RPC call to fail with YarnException."); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().contains(ILLEGAL_NUMBER_MESSAGE)); + } + + server.stop(); + } + + @Test + public void testHadoopProtoRPC() throws Exception { + test(HadoopYarnProtoRPC.class.getName()); + } + + private void test(String rpcClass) throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.IPC_RPC_IMPL, rpcClass); + YarnRPC rpc = YarnRPC.create(conf); + String bindAddr = "localhost:0"; + InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); + Server server = rpc.getServer(ContainerManagementProtocol.class, + new DummyContainerManager(), addr, conf, null, 1); + server.start(); + RPC.setProtocolEngine(conf, ContainerManagementProtocolPB.class, ProtobufRpcEngine.class); + ContainerManagementProtocol proxy = (ContainerManagementProtocol) + rpc.getProxy(ContainerManagementProtocol.class, + NetUtils.getConnectAddress(server), conf); + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + ApplicationId applicationId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId applicationAttemptId = + ApplicationAttemptId.newInstance(applicationId, 0); + ContainerId containerId = + ContainerId.newContainerId(applicationAttemptId, 100); + NodeId nodeId = NodeId.newInstance("localhost", 1234); + Resource resource = Resource.newInstance(1234, 2); + ContainerTokenIdentifier containerTokenIdentifier = + new ContainerTokenIdentifier(containerId, "localhost", "user", + resource, System.currentTimeMillis() + 10000, 42, 42, + Priority.newInstance(0), 0); + Token containerToken = newContainerToken(nodeId, "password".getBytes(), + containerTokenIdentifier); + + StartContainerRequest scRequest = + StartContainerRequest.newInstance(containerLaunchContext, + containerToken); + List<StartContainerRequest> list = new ArrayList<StartContainerRequest>(); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + proxy.startContainers(allRequests); + + List<ContainerId> containerIds = new ArrayList<ContainerId>(); + containerIds.add(containerId); + GetContainerStatusesRequest gcsRequest = + GetContainerStatusesRequest.newInstance(containerIds); + GetContainerStatusesResponse response = + proxy.getContainerStatuses(gcsRequest); + List<ContainerStatus> statuses = response.getContainerStatuses(); + + //test remote exception + boolean exception = false; + try { + StopContainersRequest stopRequest = + recordFactory.newRecordInstance(StopContainersRequest.class); + stopRequest.setContainerIds(containerIds); + proxy.stopContainers(stopRequest); + } catch (YarnException e) { + exception = true; + Assert.assertTrue(e.getMessage().contains(EXCEPTION_MSG)); + Assert.assertTrue(e.getMessage().contains(EXCEPTION_CAUSE)); + System.out.println("Test Exception is " + e.getMessage()); + } catch (Exception ex) { + ex.printStackTrace(); + } finally { + server.stop(); + } + Assert.assertTrue(exception); + Assert.assertNotNull(statuses.get(0)); + Assert.assertEquals(ContainerState.RUNNING, statuses.get(0).getState()); + } + + public class DummyContainerManager implements ContainerManagementProtocol { + + private List<ContainerStatus> statuses = new ArrayList<ContainerStatus>(); + + @Override + public GetContainerStatusesResponse getContainerStatuses( + GetContainerStatusesRequest request) + throws YarnException { + GetContainerStatusesResponse response = + recordFactory.newRecordInstance(GetContainerStatusesResponse.class); + response.setContainerStatuses(statuses); + return response; + } + + @Override + public StartContainersResponse startContainers( + StartContainersRequest requests) throws YarnException { + StartContainersResponse response = + recordFactory.newRecordInstance(StartContainersResponse.class); + for (StartContainerRequest request : requests.getStartContainerRequests()) { + Token containerToken = request.getContainerToken(); + ContainerTokenIdentifier tokenId = null; + + try { + tokenId = newContainerTokenIdentifier(containerToken); + } catch (IOException e) { + throw RPCUtil.getRemoteException(e); + } + ContainerStatus status = + recordFactory.newRecordInstance(ContainerStatus.class); + status.setState(ContainerState.RUNNING); + status.setContainerId(tokenId.getContainerID()); + status.setExitStatus(0); + statuses.add(status); + + } + return response; + } + + @Override + public StopContainersResponse stopContainers(StopContainersRequest request) + throws YarnException { + Exception e = new Exception(EXCEPTION_MSG, + new Exception(EXCEPTION_CAUSE)); + throw new YarnException(e); + } + } + + public static ContainerTokenIdentifier newContainerTokenIdentifier( + Token containerToken) throws IOException { + org.apache.hadoop.security.token.Token<ContainerTokenIdentifier> token = + new org.apache.hadoop.security.token.Token<ContainerTokenIdentifier>( + containerToken.getIdentifier() + .array(), containerToken.getPassword().array(), new Text( + containerToken.getKind()), + new Text(containerToken.getService())); + return token.decodeIdentifier(); + } + + public static Token newContainerToken(NodeId nodeId, byte[] password, + ContainerTokenIdentifier tokenIdentifier) { + // RPC layer client expects ip:port as service for tokens + InetSocketAddress addr = + NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort()); + // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token + Token containerToken = + Token.newInstance(tokenIdentifier.getBytes(), + ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil + .buildTokenService(addr).toString()); + return containerToken; + } + + // A dummy implementation for AggregatorNodemanagerProtocol for test purpose, + // it only can accept one appID, aggregatorAddr pair or throw exceptions + public class DummyNMAggregatorService + implements AggregatorNodemanagerProtocol { + + @Override + public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo( + ReportNewAggregatorsInfoRequest request) + throws YarnException, IOException { + List<AppAggregatorsMap> appAggregators = request.getAppAggregatorsList(); + if (appAggregators.size() == 1) { + // check default appID and aggregatorAddr + AppAggregatorsMap appAggregator = appAggregators.get(0); + Assert.assertEquals(appAggregator.getApplicationId(), + DEFAULT_APP_ID); + Assert.assertEquals(appAggregator.getAggregatorAddr(), + DEFAULT_AGGREGATOR_ADDR); + } else { + throw new YarnException(ILLEGAL_NUMBER_MESSAGE); + } + + ReportNewAggregatorsInfoResponse response = + recordFactory.newRecordInstance(ReportNewAggregatorsInfoResponse.class); + return response; + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index d9eeb9d..876d1d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -25,7 +25,9 @@ import static org.junit.Assert.assertTrue; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -106,11 +108,14 @@ public class TestYarnServerApiClasses { original.setLastKnownNMTokenMasterKey(getMasterKey()); original.setNodeStatus(getNodeStatus()); original.setNodeLabels(getValidNodeLabels()); + Map<ApplicationId, String> aggregators = getAggregators(); + original.setRegisteredAggregators(aggregators); NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl( original.getProto()); assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId()); assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost()); + assertEquals(aggregators, copy.getRegisteredAggregators()); // check labels are coming with valid values Assert.assertTrue(original.getNodeLabels() .containsAll(copy.getNodeLabels())); @@ -147,6 +152,8 @@ public class TestYarnServerApiClasses { original.setNextHeartBeatInterval(1000); original.setNodeAction(NodeAction.NORMAL); original.setResponseId(100); + Map<ApplicationId, String> aggregators = getAggregators(); + original.setAppAggregatorsMap(aggregators); NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl( original.getProto()); @@ -156,6 +163,7 @@ public class TestYarnServerApiClasses { assertEquals(1, copy.getContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getNMTokenMasterKey().getKeyId()); assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage()); + assertEquals(aggregators, copy.getAppAggregatorsMap()); assertEquals(false, copy.getAreNodeLabelsAcceptedByRM()); } @@ -315,6 +323,15 @@ public class TestYarnServerApiClasses { return nodeLabels; } + private Map<ApplicationId, String> getAggregators() { + ApplicationId appID = ApplicationId.newInstance(1L, 1); + String aggregatorAddr = "localhost:0"; + Map<ApplicationId, String> aggregatorMap = + new HashMap<ApplicationId, String>(); + aggregatorMap.put(appID, aggregatorAddr); + return aggregatorMap; + } + private ContainerStatus getContainerStatus(int applicationId, int containerID, int appAttemptId) { ContainerStatus status = recordFactory http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 42a4234..ffce01e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -59,6 +59,19 @@ public interface Context { ConcurrentMap<ApplicationId, Application> getApplications(); Map<ApplicationId, Credentials> getSystemCredentialsForApps(); + + /** + * Get the registered aggregators that located on this NM. + * @return registered + */ + Map<ApplicationId, String> getRegisteredAggregators(); + + /** + * Return the known aggregators which get from RM for all active applications + * running on this NM. + * @return known aggregators. + */ + Map<ApplicationId, String> getKnownAggregators(); ConcurrentMap<ContainerId, Container> getContainers(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index a06293d..de40816 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodemanager.aggregatormanager.NMAggregatorService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -92,8 +93,9 @@ public class NodeManager extends CompositeService private Context context; private AsyncDispatcher dispatcher; private ContainerManagerImpl containerManager; + private NMAggregatorService nmAggregatorService; private NodeStatusUpdater nodeStatusUpdater; - private static CompositeServiceShutdownHook nodeManagerShutdownHook; + private static CompositeServiceShutdownHook nodeManagerShutdownHook; private NMStateStoreService nmStore = null; private AtomicBoolean isStopping = new AtomicBoolean(false); @@ -140,6 +142,10 @@ public class NodeManager extends CompositeService return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, metrics, aclsManager, dirsHandler); } + + protected NMAggregatorService createNMAggregatorService(Context context) { + return new NMAggregatorService(context); + } protected WebServer createWebServer(Context nmContext, ResourceView resourceView, ApplicationACLsManager aclsManager, @@ -314,6 +320,9 @@ public class NodeManager extends CompositeService metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); DefaultMetricsSystem.initialize("NodeManager"); + + this.nmAggregatorService = createNMAggregatorService(context); + addService(nmAggregatorService); // StatusUpdater should be added last so that it get started last // so that we make sure everything is up before registering with RM. @@ -409,6 +418,12 @@ public class NodeManager extends CompositeService protected final ConcurrentMap<ContainerId, Container> containers = new ConcurrentSkipListMap<ContainerId, Container>(); + + protected Map<ApplicationId, String> registeredAggregators = + new ConcurrentHashMap<ApplicationId, String>(); + + protected Map<ApplicationId, String> knownAggregators = + new ConcurrentHashMap<ApplicationId, String>(); private final NMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInNM nmTokenSecretManager; @@ -534,6 +549,29 @@ public class NodeManager extends CompositeService getLogAggregationStatusForApps() { return this.logAggregationReportForApps; } + + @Override + public Map<ApplicationId, String> getRegisteredAggregators() { + return this.registeredAggregators; + } + + public void addRegisteredAggregators( + Map<ApplicationId, String> newRegisteredAggregators) { + this.registeredAggregators.putAll(newRegisteredAggregators); + // Update to knownAggregators as well so it can immediately be consumed by + // this NM's TimelineClient. + this.knownAggregators.putAll(newRegisteredAggregators); + } + + @Override + public Map<ApplicationId, String> getKnownAggregators() { + return this.knownAggregators; + } + + public void addKnownAggregators( + Map<ApplicationId, String> knownAggregators) { + this.knownAggregators.putAll(knownAggregators); + } } @@ -599,6 +637,11 @@ public class NodeManager extends CompositeService public Context getNMContext() { return this.context; } + + // For testing + NMAggregatorService getNMAggregatorService() { + return this.nmAggregatorService; + } public static void main(String[] args) throws IOException { Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 7c5c28b..8521d94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -710,7 +710,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements .getContainerTokenSecretManager().getCurrentKey(), NodeStatusUpdaterImpl.this.context .getNMTokenSecretManager().getCurrentKey(), - nodeLabelsForHeartbeat); + nodeLabelsForHeartbeat, + NodeStatusUpdaterImpl.this.context.getRegisteredAggregators()); if (logAggregationEnabled) { // pull log aggregation status for application running in this NM @@ -797,6 +798,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements ((NMContext) context) .setSystemCrendentialsForApps(parseCredentials(systemCredentials)); } + + Map<ApplicationId, String> knownAggregators = response.getAppAggregatorsMap(); + ((NodeManager.NMContext)context).addKnownAggregators(knownAggregators); + } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java new file mode 100644 index 0000000..17150ba --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.aggregatormanager; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.service.CompositeService; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; + +public class NMAggregatorService extends CompositeService implements + AggregatorNodemanagerProtocol { + + private static final Log LOG = LogFactory.getLog(NMAggregatorService.class); + + final Context context; + + private Server server; + + public NMAggregatorService(Context context) { + + super(NMAggregatorService.class.getName()); + this.context = context; + } + + @Override + protected void serviceStart() throws Exception { + Configuration conf = getConfig(); + + InetSocketAddress aggregatorServerAddress = conf.getSocketAddr( + YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_AGGREGATOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_PORT); + + Configuration serverConf = new Configuration(conf); + + // TODO Security settings. + YarnRPC rpc = YarnRPC.create(conf); + + server = + rpc.getServer(AggregatorNodemanagerProtocol.class, this, + aggregatorServerAddress, serverConf, + this.context.getNMTokenSecretManager(), + conf.getInt(YarnConfiguration.NM_AGGREGATOR_SERVICE_THREAD_COUNT, + YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_THREAD_COUNT)); + + server.start(); + // start remaining services + super.serviceStart(); + LOG.info("NMAggregatorService started at " + aggregatorServerAddress); + } + + + @Override + public void serviceStop() throws Exception { + if (server != null) { + server.stop(); + } + // TODO may cleanup app aggregators running on this NM in future. + super.serviceStop(); + } + + @Override + public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo( + ReportNewAggregatorsInfoRequest request) throws IOException { + List<AppAggregatorsMap> newAggregatorsList = request.getAppAggregatorsList(); + if (newAggregatorsList != null && !newAggregatorsList.isEmpty()) { + Map<ApplicationId, String> newAggregatorsMap = + new HashMap<ApplicationId, String>(); + for (AppAggregatorsMap aggregator : newAggregatorsList) { + newAggregatorsMap.put(aggregator.getApplicationId(), aggregator.getAggregatorAddr()); + } + ((NodeManager.NMContext)context).addRegisteredAggregators(newAggregatorsMap); + } + + return ReportNewAggregatorsInfoResponse.newInstance(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index e880c31..e9e2e83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -425,6 +425,10 @@ public class ApplicationImpl implements Application { new LogHandlerAppFinishedEvent(app.appId)); app.context.getNMTokenSecretManager().appFinished(app.getAppId()); + // Remove aggregator info for finished apps. + // TODO check we remove related aggregators info in failure cases (YARN-3038) + app.context.getRegisteredAggregators().remove(app.getAppId()); + app.context.getKnownAggregators().remove(app.getAppId()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index ff9b820..d94cd35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -352,6 +352,8 @@ public class ApplicationMasterService extends AbstractService implements RMApp rmApp = rmContext.getRMApps().get(applicationAttemptId.getApplicationId()); + // Remove aggregator address when app get finished. + rmApp.removeAggregatorAddr(); // checking whether the app exits in RMStateStore at first not to throw // ApplicationDoesNotExistInCacheException before and after // RM work-preserving restart. @@ -600,6 +602,10 @@ public class ApplicationMasterService extends AbstractService implements allocateResponse.setAvailableResources(allocation.getResourceLimit()); allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); + + // add aggregator address for this application + allocateResponse.setAggregatorAddr( + this.rmContext.getRMApps().get(applicationId).getAggregatorAddr()); // add preemption to the allocateResponse message (if any) allocateResponse http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 3c2c09b..dd1fdc7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -23,8 +23,10 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; @@ -63,6 +65,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppAggregatorUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -440,6 +443,11 @@ public class ResourceTrackerService extends AbstractService implements return YarnServerBuilderUtils.newNodeHeartbeatResponse(NodeAction.RESYNC, message); } + + // Check & update aggregators info from request. + // TODO make sure it won't have race condition issue for AM failed over case + // that the older registration could possible override the newer one. + updateAppAggregatorsMap(request); // Heartbeat response NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils @@ -455,12 +463,20 @@ public class ResourceTrackerService extends AbstractService implements if (!systemCredentials.isEmpty()) { nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); } + + // Return aggregators' map that NM needs to know + // TODO we should optimize this to only include aggreator info that NM + // doesn't know yet. + List<ApplicationId> keepAliveApps = remoteNodeStatus.getKeepAliveApplications(); + if (keepAliveApps != null) { + setAppAggregatorsMapToResponse(keepAliveApps, nodeHeartBeatResponse); + } // 4. Send status to RMNode, saving the latest response. RMNodeStatusEvent nodeStatusEvent = new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), remoteNodeStatus.getContainersStatuses(), - remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse); + keepAliveApps, nodeHeartBeatResponse); if (request.getLogAggregationReportsForApps() != null && !request.getLogAggregationReportsForApps().isEmpty()) { nodeStatusEvent.setLogAggregationReportsForApps(request @@ -483,6 +499,55 @@ public class ResourceTrackerService extends AbstractService implements return nodeHeartBeatResponse; } + + private void setAppAggregatorsMapToResponse( + List<ApplicationId> liveApps, NodeHeartbeatResponse response) { + Map<ApplicationId, String> liveAppAggregatorsMap = new + ConcurrentHashMap<ApplicationId, String>(); + Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps(); + for (ApplicationId appId : liveApps) { + String appAggregatorAddr = rmApps.get(appId).getAggregatorAddr(); + if (appAggregatorAddr != null) { + liveAppAggregatorsMap.put(appId, appAggregatorAddr); + } else { + // Log a debug info if aggregator address is not found. + if (LOG.isDebugEnabled()) { + LOG.debug("Aggregator for applicaton: " + appId + " hasn't registered yet!"); + } + } + } + response.setAppAggregatorsMap(liveAppAggregatorsMap); + } + + private void updateAppAggregatorsMap(NodeHeartbeatRequest request) { + Map<ApplicationId, String> registeredAggregatorsMap = + request.getRegisteredAggregators(); + if (registeredAggregatorsMap != null + && !registeredAggregatorsMap.isEmpty()) { + Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps(); + for (Map.Entry<ApplicationId, String> entry: + registeredAggregatorsMap.entrySet()) { + ApplicationId appId = entry.getKey(); + String aggregatorAddr = entry.getValue(); + if (aggregatorAddr != null && !aggregatorAddr.isEmpty()) { + RMApp rmApp = rmApps.get(appId); + if (rmApp == null) { + LOG.warn("Cannot update aggregator info because application ID: " + + appId + " is not found in RMContext!"); + } else { + String previousAggregatorAddr = rmApp.getAggregatorAddr(); + if (previousAggregatorAddr == null || + previousAggregatorAddr != aggregatorAddr) { + // sending aggregator update event. + RMAppAggregatorUpdateEvent event = + new RMAppAggregatorUpdateEvent(appId, aggregatorAddr); + rmContext.getDispatcher().getEventHandler().handle(event); + } + } + } + } + } + } @SuppressWarnings("unchecked") @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index be9dfaf..8fbedb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -174,6 +174,23 @@ public interface RMApp extends EventHandler<RMAppEvent> { * @return the tracking url for the application master. */ String getTrackingUrl(); + + /** + * The aggregator address for the application. + * @return the address for the application's aggregator. + */ + String getAggregatorAddr(); + + /** + * Set aggregator address for the application + * @param aggregatorAddr the address of aggregator + */ + void setAggregatorAddr(String aggregatorAddr); + + /** + * Remove aggregator address when application is finished or killed. + */ + void removeAggregatorAddr(); /** * The original tracking url for the application master. http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java new file mode 100644 index 0000000..b43de44 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +public class RMAppAggregatorUpdateEvent extends RMAppEvent { + + private final String appAggregatorAddr; + + public RMAppAggregatorUpdateEvent(ApplicationId appId, String appAggregatorAddr) { + super(appId, RMAppEventType.AGGREGATOR_UPDATE); + this.appAggregatorAddr = appAggregatorAddr; + } + + public String getAppAggregatorAddr(){ + return this.appAggregatorAddr; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index 668c5e1..6e9460a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -30,6 +30,9 @@ public enum RMAppEventType { // Source: Scheduler APP_ACCEPTED, + + // TODO add source later + AGGREGATOR_UPDATE, // Source: RMAppAttempt ATTEMPT_REGISTERED, http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 42ff1de..9e63cea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -142,6 +142,7 @@ public class RMAppImpl implements RMApp, Recoverable { private long startTime; private long finishTime = 0; private long storedFinishTime = 0; + private String aggregatorAddr; // This field isn't protected by readlock now. private volatile RMAppAttempt currentAttempt; private String queue; @@ -187,6 +188,8 @@ public class RMAppImpl implements RMApp, Recoverable { // Transitions from NEW state .addTransition(RMAppState.NEW, RMAppState.NEW, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) + .addTransition(RMAppState.NEW, RMAppState.NEW, + RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppEventType.START, new RMAppNewlySavingTransition()) .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED, @@ -203,6 +206,8 @@ public class RMAppImpl implements RMApp, Recoverable { // Transitions from NEW_SAVING state .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) + .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, + RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED, RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition()) .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING, @@ -221,6 +226,8 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, RMAppEventType.MOVE, new RMAppMoveTransition()) + .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, + RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING, RMAppEventType.APP_REJECTED, new FinalSavingTransition( @@ -237,6 +244,8 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.MOVE, new RMAppMoveTransition()) + .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, + RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING, RMAppEventType.ATTEMPT_REGISTERED) .addTransition(RMAppState.ACCEPTED, @@ -263,6 +272,8 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, RMAppEventType.MOVE, new RMAppMoveTransition()) + .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, + RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING, RMAppEventType.ATTEMPT_UNREGISTERED, new FinalSavingTransition( @@ -292,6 +303,8 @@ public class RMAppImpl implements RMApp, Recoverable { .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) + .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, + RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) // ignorable transitions .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL, @@ -303,6 +316,8 @@ public class RMAppImpl implements RMApp, Recoverable { .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) + .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, + RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) // ignorable transitions .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, EnumSet.of(RMAppEventType.NODE_UPDATE, @@ -314,6 +329,8 @@ public class RMAppImpl implements RMApp, Recoverable { .addTransition(RMAppState.KILLING, RMAppState.KILLING, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) + .addTransition(RMAppState.KILLING, RMAppState.KILLING, + RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING, RMAppEventType.ATTEMPT_KILLED, new FinalSavingTransition( @@ -531,6 +548,21 @@ public class RMAppImpl implements RMApp, Recoverable { public void setQueue(String queue) { this.queue = queue; } + + @Override + public String getAggregatorAddr() { + return this.aggregatorAddr; + } + + @Override + public void setAggregatorAddr(String aggregatorAddr) { + this.aggregatorAddr = aggregatorAddr; + } + + @Override + public void removeAggregatorAddr() { + this.aggregatorAddr = null; + } @Override public String getName() { @@ -785,6 +817,8 @@ public class RMAppImpl implements RMApp, Recoverable { this.diagnostics.append(appState.getDiagnostics()); this.storedFinishTime = appState.getFinishTime(); this.startTime = appState.getStartTime(); + //TODO recover aggregator address. + //this.aggregatorAddr = appState.getAggregatorAddr(); for(int i=0; i<appState.getAttemptCount(); ++i) { // create attempt @@ -826,9 +860,24 @@ public class RMAppImpl implements RMApp, Recoverable { SingleArcTransition<RMAppImpl, RMAppEvent> { public void transition(RMAppImpl app, RMAppEvent event) { }; - } + private static final class RMAppAggregatorUpdateTransition + extends RMAppTransition { + + public void transition(RMAppImpl app, RMAppEvent event) { + LOG.info("Updating aggregator info for app: " + app.getApplicationId()); + + RMAppAggregatorUpdateEvent appAggregatorUpdateEvent = + (RMAppAggregatorUpdateEvent) event; + // Update aggregator address + app.setAggregatorAddr(appAggregatorUpdateEvent.getAppAggregatorAddr()); + + // TODO persistent to RMStateStore for recover + // Save to RMStateStore + }; + } + private static final class RMAppNodeUpdateTransition extends RMAppTransition { public void transition(RMAppImpl app, RMAppEvent event) { RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event; http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index a23c789..d9cdee0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -94,6 +94,18 @@ public abstract class MockAsm extends MockApps { throw new UnsupportedOperationException("Not supported yet."); } @Override + public String getAggregatorAddr() { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override + public void setAggregatorAddr(String aggregatorAddr) { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override + public void removeAggregatorAddr() { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public ApplicationId getApplicationId() { throw new UnsupportedOperationException("Not supported yet."); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index c6ee3ba..83e99b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -283,4 +283,19 @@ public class MockRMApp implements RMApp { public LogAggregationStatus getLogAggregationStatusForAppReport() { return null; } + + @Override + public String getAggregatorAddr() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void removeAggregatorAddr() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void setAggregatorAddr(String aggregatorAddr) { + throw new UnsupportedOperationException("Not supported yet."); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java index cdc4e35..19920fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java @@ -94,10 +94,9 @@ public class PerNodeTimelineAggregatorsAuxService extends AuxiliaryService { * @return whether it was added successfully */ public boolean addApplication(ApplicationId appId) { - String appIdString = appId.toString(); AppLevelTimelineAggregator aggregator = - new AppLevelTimelineAggregator(appIdString); - return (aggregatorCollection.putIfAbsent(appIdString, aggregator) + new AppLevelTimelineAggregator(appId.toString()); + return (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbfc0537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java index 73b6d52..d6e2a18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java @@ -18,7 +18,9 @@ package org.apache.hadoop.yarn.server.timelineservice.aggregator; +import java.io.IOException; import java.net.URI; +import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -30,9 +32,15 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.http.lib.StaticUserWebFilter; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @@ -62,6 +70,12 @@ public class TimelineAggregatorsCollection extends CompositeService { // REST server for this aggregator collection private HttpServer2 timelineRestServer; + + private String timelineRestServerBindAddress; + + private AggregatorNodemanagerProtocol nmAggregatorService; + + private InetSocketAddress nmAggregatorServiceAddress; static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection"; @@ -74,6 +88,16 @@ public class TimelineAggregatorsCollection extends CompositeService { } @Override + public void serviceInit(Configuration conf) throws Exception { + this.nmAggregatorServiceAddress = conf.getSocketAddr( + YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_AGGREGATOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_PORT); + + } + + @Override protected void serviceStart() throws Exception { startWebApp(); super.serviceStart(); @@ -95,9 +119,13 @@ public class TimelineAggregatorsCollection extends CompositeService { * starting the app level service * @return the aggregator associated with id after the potential put. */ - public TimelineAggregator putIfAbsent(String id, TimelineAggregator aggregator) { + public TimelineAggregator putIfAbsent(ApplicationId appId, + TimelineAggregator aggregator) { + String id = appId.toString(); + TimelineAggregator aggregatorInTable; + boolean aggregatorIsNew = false; synchronized (aggregators) { - TimelineAggregator aggregatorInTable = aggregators.get(id); + aggregatorInTable = aggregators.get(id); if (aggregatorInTable == null) { try { // initialize, start, and add it to the collection so it can be @@ -106,16 +134,30 @@ public class TimelineAggregatorsCollection extends CompositeService { aggregator.start(); aggregators.put(id, aggregator); LOG.info("the aggregator for " + id + " was added"); - return aggregator; + aggregatorInTable = aggregator; + aggregatorIsNew = true; } catch (Exception e) { throw new YarnRuntimeException(e); } } else { String msg = "the aggregator for " + id + " already exists!"; LOG.error(msg); - return aggregatorInTable; + } + + } + // Report to NM if a new aggregator is added. + if (aggregatorIsNew) { + try { + reportNewAggregatorToNM(appId); + } catch (Exception e) { + // throw exception here as it cannot be used if failed report to NM + LOG.error("Failed to report a new aggregator for application: " + appId + + " to NM Aggregator Services."); + throw new YarnRuntimeException(e); } } + + return aggregatorInTable; } /** @@ -167,7 +209,10 @@ public class TimelineAggregatorsCollection extends CompositeService { String bindAddress = WebAppUtils.getWebAppBindURL(conf, YarnConfiguration.TIMELINE_SERVICE_BIND_HOST, WebAppUtils.getAHSWebAppURLWithoutScheme(conf)); - LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress); + this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress( + NetUtils.createSocketAddr(bindAddress)); + LOG.info("Instantiating the per-node aggregator webapp at " + + timelineRestServerBindAddress); try { Configuration confForInfoServer = new Configuration(conf); confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10); @@ -200,4 +245,27 @@ public class TimelineAggregatorsCollection extends CompositeService { throw new YarnRuntimeException(msg, e); } } + + private void reportNewAggregatorToNM(ApplicationId appId) + throws YarnException, IOException { + this.nmAggregatorService = getNMAggregatorService(); + ReportNewAggregatorsInfoRequest request = + ReportNewAggregatorsInfoRequest.newInstance(appId, + this.timelineRestServerBindAddress); + LOG.info("Report a new aggregator for application: " + appId + + " to NM Aggregator Services."); + nmAggregatorService.reportNewAggregatorInfo(request); + } + + // protected for test + protected AggregatorNodemanagerProtocol getNMAggregatorService(){ + Configuration conf = getConfig(); + final YarnRPC rpc = YarnRPC.create(conf); + + // TODO Security settings. + return (AggregatorNodemanagerProtocol) rpc.getProxy( + AggregatorNodemanagerProtocol.class, + nmAggregatorServiceAddress, conf); + } + }