This is an automated email from the ASF dual-hosted git repository.
eyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new a33ef4f YARN-8867. Added resource localization status to YARN service
status call. Contributed by Chandni Singh
a33ef4f is described below
commit a33ef4fd311784dc15401eb54c82e78528c4f961
Author: Eric Yang <[email protected]>
AuthorDate: Thu Jan 24 18:43:21 2019 -0500
YARN-8867. Added resource localization status to YARN service status call.
Contributed by Chandni Singh
---
.../v2/app/launcher/TestContainerLauncher.java | 9 +
.../v2/app/launcher/TestContainerLauncherImpl.java | 9 +
.../yarn/api/ContainerManagementProtocol.java | 19 ++
.../GetLocalizationStatusesRequest.java | 69 ++++++
.../GetLocalizationStatusesResponse.java | 87 +++++++
.../hadoop/yarn/api/records/LocalizationState.java | 36 +++
.../yarn/api/records/LocalizationStatus.java | 95 ++++++++
.../main/proto/containermanagement_protocol.proto | 4 +
.../src/main/proto/yarn_service_protos.proto | 28 +++
.../hadoop/yarn/service/api/records/Container.java | 30 +++
.../service/api/records/LocalizationStatus.java | 132 +++++++++++
.../hadoop/yarn/service/component/Component.java | 9 +-
.../component/instance/ComponentInstance.java | 141 ++++++++++-
.../containerlaunch/ContainerLaunchService.java | 28 ++-
.../service/provider/AbstractProviderService.java | 12 +-
.../yarn/service/provider/ProviderService.java | 40 +++-
.../yarn/service/provider/ProviderUtils.java | 19 +-
.../yarn/service/MockRunningServiceContext.java | 41 +++-
.../apache/hadoop/yarn/service/MockServiceAM.java | 2 +-
.../hadoop/yarn/service/ServiceTestUtils.java | 2 +
.../apache/hadoop/yarn/service/TestServiceAM.java | 7 +-
.../component/instance/TestComponentInstance.java | 61 +++++
.../yarn/service/provider/TestProviderUtils.java | 9 +-
.../apache/hadoop/yarn/client/api/NMClient.java | 34 +++
.../hadoop/yarn/client/api/impl/NMClientImpl.java | 56 +++++
.../ContainerManagementProtocolPBClientImpl.java | 22 ++
.../ContainerManagementProtocolPBServiceImpl.java | 20 ++
.../pb/GetLocalizationStatusesRequestPBImpl.java | 156 +++++++++++++
.../pb/GetLocalizationStatusesResponsePBImpl.java | 260 +++++++++++++++++++++
.../records/impl/pb/LocalizationStatusPBImpl.java | 192 +++++++++++++++
.../yarn/api/records/impl/pb/ProtoUtils.java | 20 ++
.../apache/hadoop/yarn/TestContainerLaunchRPC.java | 9 +
.../yarn/TestContainerResourceIncreaseRPC.java | 9 +
.../test/java/org/apache/hadoop/yarn/TestRPC.java | 9 +
.../containermanager/ContainerManagerImpl.java | 53 +++++
.../containermanager/container/Container.java | 7 +
.../containermanager/container/ContainerImpl.java | 16 +-
.../containermanager/localizer/ResourceSet.java | 45 +++-
.../containermanager/TestContainerManager.java | 128 ++++++++++
.../localizer/TestResourceSet.java | 106 +++++++++
.../server/nodemanager/webapp/MockContainer.java | 6 +
.../yarn/server/resourcemanager/NodeManager.java | 9 +
.../resourcemanager/TestAMAuthorization.java | 9 +
.../TestApplicationMasterLauncher.java | 9 +
44 files changed, 2029 insertions(+), 35 deletions(-)
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
index d5bf03d..222c2ae 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
@@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
@@ -516,5 +518,12 @@ public class TestContainerLauncher {
request) throws YarnException, IOException {
return null;
}
+
+ @Override
+ public GetLocalizationStatusesResponse getLocalizationStatuses(
+ GetLocalizationStatusesRequest request) throws YarnException,
+ IOException {
+ return null;
+ }
}
}
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
index 0ae0380..7788300 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
@@ -47,6 +47,8 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
@@ -521,6 +523,13 @@ public class TestContainerLauncherImpl {
request) throws YarnException, IOException {
return null;
}
+
+ @Override
+ public GetLocalizationStatusesResponse getLocalizationStatuses(
+ GetLocalizationStatusesRequest request) throws YarnException,
+ IOException {
+ return null;
+ }
}
@SuppressWarnings("serial")
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java
index 8fceb46..0444440 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java
@@ -28,6 +28,8 @@ import
org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
@@ -288,4 +290,21 @@ public interface ContainerManagementProtocol {
@Unstable
CommitResponse commitLastReInitialization(ContainerId containerId)
throws YarnException, IOException;
+
+ /**
+ * API to request for the localization statuses of requested containers from
+ * the Node Manager.
+ * @param request {@link GetLocalizationStatusesRequest} which includes the
+ * container ids of all the containers whose localization
+ * statuses are needed.
+ * @return {@link GetLocalizationStatusesResponse} which contains the
+ * localization statuses of all the requested containers.
+ * @throws YarnException Exception specific to YARN.
+ * @throws IOException IOException thrown from the RPC layer.
+ */
+ @Public
+ @Unstable
+ GetLocalizationStatusesResponse getLocalizationStatuses(
+ GetLocalizationStatusesRequest request) throws YarnException,
+ IOException;
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetLocalizationStatusesRequest.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetLocalizationStatusesRequest.java
new file mode 100644
index 0000000..e6c3947
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetLocalizationStatusesRequest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalizationStatus;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.List;
+
+/**
+ * The request sent by an application master to the node manager to get
+ * {@link LocalizationStatus}es of containers.
+ *
+ * @see ContainerManagementProtocol#getLocalizationStatuses(
+ * GetLocalizationStatusesRequest)
+ */
+@Public
+@Unstable
+public abstract class GetLocalizationStatusesRequest {
+
+ @Public
+ @Unstable
+ public static GetLocalizationStatusesRequest newInstance(
+ List<ContainerId> containerIds) {
+ GetLocalizationStatusesRequest request =
+ Records.newRecord(GetLocalizationStatusesRequest.class);
+ request.setContainerIds(containerIds);
+ return request;
+ }
+
+ /**
+ * Get the list of container IDs of the containers for which the localization
+ * statuses are needed.
+ *
+ * @return the list of container IDs.
+ */
+ @Public
+ @Unstable
+ public abstract List<ContainerId> getContainerIds();
+
+ /**
+ * Sets the list of container IDs of containers for which the localization
+ * statuses are needed.
+ * @param containerIds the list of container IDs.
+ */
+ @Public
+ @Unstable
+ public abstract void setContainerIds(List<ContainerId> containerIds);
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetLocalizationStatusesResponse.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetLocalizationStatusesResponse.java
new file mode 100644
index 0000000..89fca9f
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetLocalizationStatusesResponse.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalizationStatus;
+import org.apache.hadoop.yarn.api.records.SerializedException;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * The response sent by the node manager to an application master when
+ * localization statuses are requested.
+ *
+ * @see ContainerManagementProtocol#getLocalizationStatuses(
+ * GetLocalizationStatusesRequest)
+ */
[email protected]
[email protected]
+public abstract class GetLocalizationStatusesResponse {
+
+ public static GetLocalizationStatusesResponse newInstance(
+ Map<ContainerId, List<LocalizationStatus>> statuses,
+ Map<ContainerId, SerializedException> failedRequests) {
+ GetLocalizationStatusesResponse response =
+ Records.newRecord(GetLocalizationStatusesResponse.class);
+ response.setLocalizationStatuses(statuses);
+ return response;
+ }
+
+ /**
+ * Get all the container localization statuses.
+ *
+ * @return container localization statuses.
+ */
+ public abstract Map<ContainerId,
+ List<LocalizationStatus>> getLocalizationStatuses();
+
+ /**
+ * Sets the container localization statuses.
+ *
+ * @param statuses container localization statuses.
+ */
+ @InterfaceAudience.Private
+ public abstract void setLocalizationStatuses(
+ Map<ContainerId, List<LocalizationStatus>> statuses);
+
+
+ /**
+ * Get the containerId-to-exception map in which the exception indicates
error
+ * from per container for failed requests.
+ *
+ * @return map of containerId-to-exception
+ */
+ @InterfaceAudience.Private
+ public abstract Map<ContainerId, SerializedException> getFailedRequests();
+
+ /**
+ * Set the containerId-to-exception map in which the exception indicates
error
+ * from per container for failed request.
+ */
+ @InterfaceAudience.Private
+ public abstract void setFailedRequests(
+ Map<ContainerId, SerializedException> failedContainers);
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalizationState.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalizationState.java
new file mode 100644
index 0000000..0505d5f
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalizationState.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * State of localization.
+ */
+@Public
+@Unstable
+public enum LocalizationState {
+
+ PENDING,
+
+ COMPLETED,
+
+ FAILED
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalizationStatus.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalizationStatus.java
new file mode 100644
index 0000000..bca95b7
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalizationStatus.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Represents the localization status of a resource.
+ * The status of the localization includes:
+ * <ul>
+ * <li>resource key</li>
+ * <li>{@link LocalizationState} of the resource</li>
+ * </ul>
+ */
+@Public
+@Unstable
+public abstract class LocalizationStatus {
+
+ public static LocalizationStatus newInstance(String resourceKey,
+ LocalizationState localizationState) {
+ return newInstance(resourceKey, localizationState, null);
+ }
+
+ public static LocalizationStatus newInstance(String resourceKey,
+ LocalizationState localizationState,
+ String diagnostics) {
+ LocalizationStatus status = Records.newRecord(LocalizationStatus.class);
+ status.setResourceKey(resourceKey);
+ status.setLocalizationState(localizationState);
+ status.setDiagnostics(diagnostics);
+ return status;
+ }
+
+ /**
+ * Get the resource key.
+ *
+ * @return resource key.
+ */
+ public abstract String getResourceKey();
+
+ /**
+ * Sets the resource key.
+ * @param resourceKey
+ */
+ @InterfaceAudience.Private
+ public abstract void setResourceKey(String resourceKey);
+
+ /**
+ * Get the localization sate.
+ *
+ * @return localization state.
+ */
+ public abstract LocalizationState getLocalizationState();
+
+ /**
+ * Sets the localization state.
+ * @param state localization state
+ */
+ @InterfaceAudience.Private
+ public abstract void setLocalizationState(LocalizationState state);
+
+ /**
+ * Get the diagnostics.
+ *
+ * @return diagnostics.
+ */
+ public abstract String getDiagnostics();
+
+ /**
+ * Sets the diagnostics.
+ * @param diagnostics diagnostics.
+ */
+ @InterfaceAudience.Private
+ public abstract void setDiagnostics(String diagnostics);
+
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto
index 22b4406..1f8cafb 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto
@@ -44,4 +44,8 @@ service ContainerManagementProtocolService {
rpc restartContainer(ContainerIdProto) returns
(RestartContainerResponseProto);
rpc rollbackLastReInitialization(ContainerIdProto) returns
(RollbackResponseProto);
rpc commitLastReInitialization(ContainerIdProto) returns
(CommitResponseProto);
+
+ rpc getLocalizationStatuses(GetLocalizationStatusesRequestProto)
+ returns (GetLocalizationStatusesResponseProto);
+
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index 248f775..b58b828 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -543,3 +543,31 @@ message RunSharedCacheCleanerTaskRequestProto {
message RunSharedCacheCleanerTaskResponseProto {
optional bool accepted = 1;
}
+
+// Localization
+message GetLocalizationStatusesRequestProto {
+ repeated ContainerIdProto container_id = 1;
+}
+
+message GetLocalizationStatusesResponseProto {
+ repeated ContainerLocalizationStatusesProto cntn_localization_statuses = 1;
+ repeated ContainerExceptionMapProto failed_requests = 2;
+}
+
+enum LocalizationStateProto {
+ L_PENDING = 1;
+ L_COMPLETED = 2;
+ L_FAILED = 3;
+}
+
+
+message LocalizationStatusProto {
+ optional string resource_key = 1;
+ optional LocalizationStateProto localization_state= 2;
+ optional string diagnostics = 3;
+}
+
+message ContainerLocalizationStatusesProto {
+ optional ContainerIdProto container_id = 1;
+ repeated LocalizationStatusProto localization_statuses = 2;
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Container.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Container.java
index 48d54e9..99ba799 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Container.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Container.java
@@ -55,6 +55,7 @@ public class Container extends BaseResource {
private Artifact artifact = null;
private Boolean privilegedContainer = null;
private Map<String, List<Map<String, String>>> exposedPorts = null;
+ private List<LocalizationStatus> localizationStatuses = null;
/**
* Unique container id of a running service, e.g.
@@ -258,6 +259,35 @@ public class Container extends BaseResource {
this.exposedPorts = ports;
}
+ /**
+ * Localization statuses.
+ */
+ @ApiModelProperty(example = "null", value =
+ "Localization statuses of a container.")
+ @JsonProperty("localization_statuses")
+ public List<LocalizationStatus> getLocalizationStatuses() {
+ return localizationStatuses;
+ }
+
+ /**
+ * Sets the localization statuses.
+ * @param statuses localization statuses.
+ */
+ @XmlElement(name = "localization_statuses")
+ public void setLocalizationStatuses(List<LocalizationStatus> statuses) {
+ this.localizationStatuses = statuses;
+ }
+
+ /**
+ * Sets the localization statuses and returns the container.
+ * @param statuses
+ * @return
+ */
+ public Container localizationStatuses(List<LocalizationStatus> statuses) {
+ this.localizationStatuses = statuses;
+ return this;
+ }
+
@Override
public boolean equals(java.lang.Object o) {
if (this == o) {
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/LocalizationStatus.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/LocalizationStatus.java
new file mode 100644
index 0000000..3f76ba3
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/LocalizationStatus.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.api.records;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.LocalizationState;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+
+/**
+ * The status of localization.
+ */
[email protected]
[email protected]
+@ApiModel(description = "Localization status of a resource.")
+@XmlRootElement
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class LocalizationStatus implements Serializable {
+
+ private static final long serialVersionUID = -5745287278502373531L;
+
+ private String destFile;
+
+ private LocalizationState state;
+
+ private String diagnostics;
+
+ /**
+ * Destination file.
+ */
+ @JsonProperty("dest_file")
+ public String getDestFile() {
+ return destFile;
+ }
+
+ /**
+ * Sets the destination file.
+ *
+ * @param destFile destination file
+ */
+ @XmlElement(name = "dest_file")
+ public void setDestFile(String destFile) {
+ this.destFile = destFile;
+ }
+
+ /**
+ * Sets the destination file and returns the localization status.
+ *
+ * @param fileName destination file
+ */
+ public LocalizationStatus destFile(String fileName) {
+ this.destFile = fileName;
+ return this;
+ }
+
+ /**
+ * Localization state.
+ */
+ @JsonProperty("state")
+ public LocalizationState getState() {
+ return state;
+ }
+
+ /**
+ * Sets the localization state.
+ *
+ * @param localizationState localization state
+ */
+ @XmlElement(name = "state")
+ public void setState(LocalizationState localizationState) {
+ this.state = localizationState;
+ }
+
+ /**
+ * Sets the localization state and returns the localization status.
+ *
+ * @param localizationState localization state
+ */
+ public LocalizationStatus state(LocalizationState localizationState) {
+ this.state = localizationState;
+ return this;
+ }
+
+ /**
+ * Diagnostics.
+ */
+ @JsonProperty("diagnostics")
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+
+ /**
+ * Sets the diagnostics.
+ *
+ * @param diag diagnostics
+ */
+ @XmlElement(name = "diagnostics")
+ public void setDiagnostics(String diag) {
+ this.diagnostics = diag;
+ }
+
+ /**
+ * Sets the diagnostics and returns the localization status.
+ *
+ * @param diag diagnostics
+ */
+ public LocalizationStatus diagnostics(String diag) {
+ this.diagnostics = diag;
+ return this;
+ }
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
index f885b25..8958dc7 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
@@ -56,6 +56,7 @@ import
org.apache.hadoop.yarn.service.monitor.ComponentHealthThresholdMonitor;
import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
import org.apache.hadoop.yarn.service.monitor.probe.Probe;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
+import org.apache.hadoop.yarn.service.provider.ProviderService;
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
@@ -79,6 +80,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -695,19 +697,22 @@ public class Component implements
EventHandler<ComponentEvent> {
"[COMPONENT {}]: Assigned {} to component instance {} and launch on
host {} ",
getName(), container.getId(), instance.getCompInstanceName(),
container.getNodeId());
+ Future<ProviderService.ResolvedLaunchParams> resolvedParamFuture;
if (!(upgradeStatus.isCompleted() && cancelUpgradeStatus.isCompleted())) {
UpgradeStatus status = !cancelUpgradeStatus.isCompleted() ?
cancelUpgradeStatus : upgradeStatus;
- scheduler.getContainerLaunchService()
+ resolvedParamFuture = scheduler.getContainerLaunchService()
.launchCompInstance(scheduler.getApp(), instance, container,
createLaunchContext(status.getTargetSpec(),
status.getTargetVersion()));
} else {
- scheduler.getContainerLaunchService().launchCompInstance(
+ resolvedParamFuture = scheduler.getContainerLaunchService()
+ .launchCompInstance(
scheduler.getApp(), instance, container,
createLaunchContext(componentSpec, scheduler.getApp().getVersion()));
}
+ instance.updateResolvedLaunchParams(resolvedParamFuture);
}
public ContainerLaunchService.ComponentLaunchContext createLaunchContext(
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
index 27153da..ec62194 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalizationState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.service.ServiceScheduler;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
+import org.apache.hadoop.yarn.service.api.records.LocalizationStatus;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
@@ -51,6 +53,7 @@ import
org.apache.hadoop.yarn.service.component.ComponentEventType;
import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
import org.apache.hadoop.yarn.service.monitor.probe.DefaultProbe;
import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
+import org.apache.hadoop.yarn.service.provider.ProviderService;
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
@@ -65,10 +68,14 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.MessageFormat;
+import java.util.ArrayList;
import java.util.Date;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -115,6 +122,8 @@ public class ComponentInstance implements
EventHandler<ComponentInstanceEvent>,
private String serviceVersion;
private AtomicBoolean upgradeInProgress = new AtomicBoolean(false);
private boolean pendingCancelUpgrade = false;
+ private ProviderService.ResolvedLaunchParams resolvedParams;
+ private ScheduledFuture lclizationRetrieverFuture;
private static final StateMachineFactory<ComponentInstance,
ComponentInstanceState, ComponentInstanceEventType,
@@ -192,6 +201,9 @@ public class ComponentInstance implements
EventHandler<ComponentInstanceEvent>,
ComponentInstanceEvent event) {
// Query container status for ip and host
compInstance.initializeStatusRetriever(event, 0);
+ compInstance.initializeLocalizationStatusRetriever(
+ event.getContainerId());
+
long containerStartTime = System.currentTimeMillis();
try {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
@@ -277,6 +289,7 @@ public class ComponentInstance implements
EventHandler<ComponentInstanceEvent>,
} else {
instance.initializeStatusRetriever(event, 0);
}
+ instance.initializeLocalizationStatusRetriever(event.getContainerId());
Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ?
instance.component.getUpgradeStatus() :
@@ -292,6 +305,17 @@ public class ComponentInstance implements
EventHandler<ComponentInstanceEvent>,
if (timelineServiceEnabled) {
serviceTimelinePublisher.componentInstanceBecomeReady(containerSpec);
}
+ try {
+ List<org.apache.hadoop.yarn.api.records.LocalizationStatus>
+ statusesFromNM = scheduler.getNmClient().getClient()
+ .getLocalizationStatuses(container.getId(), container.getNodeId());
+ if (statusesFromNM != null && !statusesFromNM.isEmpty()) {
+ updateLocalizationStatuses(statusesFromNM);
+ }
+ } catch (YarnException | IOException e) {
+ LOG.warn("{} failure getting localization statuses", container.getId(),
+ e);
+ }
}
private static class ContainerBecomeNotReadyTransition extends
BaseTransition {
@@ -411,6 +435,7 @@ public class ComponentInstance implements
EventHandler<ComponentInstanceEvent>,
(status != null ? status.getDiagnostics() : UPGRADE_FAILED));
compInstance.diagnostics.append(containerDiag + System.lineSeparator());
compInstance.cancelContainerStatusRetriever();
+ compInstance.cancelLclRetriever();
if (compInstance.getState().equals(READY)) {
compInstance.component.decContainersReady(true);
@@ -639,13 +664,16 @@ public class ComponentInstance implements
EventHandler<ComponentInstanceEvent>,
private void reInitHelper(Component.UpgradeStatus upgradeStatus) {
cancelContainerStatusRetriever();
+ cancelLclRetriever();
setContainerStatus(container.getId(), null);
scheduler.executorService.submit(() -> cleanupRegistry(container.getId()));
- scheduler.getContainerLaunchService()
+ Future<ProviderService.ResolvedLaunchParams> launchParamsFuture =
+ scheduler.getContainerLaunchService()
.reInitCompInstance(scheduler.getApp(), this,
this.container, this.component.createLaunchContext(
upgradeStatus.getTargetSpec(),
upgradeStatus.getTargetVersion()));
+ updateResolvedLaunchParams(launchParamsFuture);
}
private void initializeStatusRetriever(ComponentInstanceEvent event,
@@ -750,6 +778,61 @@ public class ComponentInstance implements
EventHandler<ComponentInstanceEvent>,
return compInstanceId.getCompInstanceName();
}
+ @VisibleForTesting
+ void updateLocalizationStatuses(
+ List<org.apache.hadoop.yarn.api.records.LocalizationStatus> statuses) {
+ Map<String, String> resourcesCpy = new HashMap<>();
+ try {
+ readLock.lock();
+ if (resolvedParams == null || resolvedParams.didLaunchFail() ||
+ resolvedParams.getResolvedRsrcPaths() == null ||
+ resolvedParams.getResolvedRsrcPaths().isEmpty()) {
+ cancelLclRetriever();
+ return;
+ }
+ resourcesCpy.putAll(resolvedParams.getResolvedRsrcPaths());
+ } finally {
+ readLock.unlock();
+ }
+ boolean allCompleted = true;
+ Map<String, LocalizationStatus> fromNM = new HashMap<>();
+ statuses.forEach(statusFromNM -> {
+ LocalizationStatus lstatus = new LocalizationStatus()
+ .destFile(statusFromNM.getResourceKey())
+ .diagnostics(statusFromNM.getDiagnostics())
+ .state(statusFromNM.getLocalizationState());
+ fromNM.put(statusFromNM.getResourceKey(), lstatus);
+ });
+
+ for (String resourceKey : resourcesCpy.keySet()) {
+ LocalizationStatus lstatus = fromNM.get(resourceKey);
+ if (lstatus == null ||
+ lstatus.getState().equals(LocalizationState.PENDING)) {
+ allCompleted = false;
+ break;
+ }
+ }
+
+ List<LocalizationStatus> statusList = new ArrayList<>();
+ statusList.addAll(fromNM.values());
+ this.containerSpec.setLocalizationStatuses(statusList);
+ if (allCompleted) {
+ cancelLclRetriever();
+ }
+ }
+
+ public void updateResolvedLaunchParams(
+ Future<ProviderService.ResolvedLaunchParams> future) {
+ try {
+ writeLock.lock();
+ this.resolvedParams = future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("{} updating resolved params", getCompInstanceId(), e);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
public ContainerStatus getContainerStatus() {
try {
readLock.lock();
@@ -916,6 +999,7 @@ public class ComponentInstance implements
EventHandler<ComponentInstanceEvent>,
cancelContainerStatusRetriever();
scheduler.executorService.submit(() ->
cleanupRegistryAndCompHdfsDir(containerId));
+ cancelLclRetriever();
}
private void cleanupRegistry(ContainerId containerId) {
@@ -998,6 +1082,61 @@ public class ComponentInstance implements
EventHandler<ComponentInstanceEvent>,
}
}
+ private static class LocalizationStatusRetriever implements Runnable {
+ private ContainerId containerId;
+ private NodeId nodeId;
+ private NMClient nmClient;
+ private ComponentInstance instance;
+
+ LocalizationStatusRetriever(ServiceScheduler scheduler,
+ ContainerId containerId, ComponentInstance instance) {
+ this.nmClient = scheduler.getNmClient().getClient();
+ this.containerId = containerId;
+ this.instance = instance;
+ this.nodeId = instance.getNodeId();
+ }
+
+ @Override
+ public void run() {
+ List<org.apache.hadoop.yarn.api.records.LocalizationStatus>
+ statusesFromNM = null;
+ try {
+ statusesFromNM = nmClient.getLocalizationStatuses(containerId,
+ nodeId);
+ } catch (YarnException | IOException e) {
+ LOG.error("{} Failed to get localization statuses for {} {} ",
+ instance.compInstanceId, nodeId, containerId, e);
+ }
+ if (statusesFromNM != null && !statusesFromNM.isEmpty()) {
+ instance.updateLocalizationStatuses(statusesFromNM);
+ }
+ }
+ }
+
+ private void initializeLocalizationStatusRetriever(
+ ContainerId containerId) {
+ LOG.info("{} retrieve localization statuses", compInstanceId);
+ lclizationRetrieverFuture = scheduler.executorService.scheduleAtFixedRate(
+ new LocalizationStatusRetriever(scheduler, containerId, this),
+ 0, 1, TimeUnit.SECONDS
+ );
+ }
+
+ private void cancelLclRetriever() {
+ if (lclizationRetrieverFuture != null &&
+ !lclizationRetrieverFuture.isDone()) {
+ LOG.info("{} cancelling localization retriever", compInstanceId);
+ lclizationRetrieverFuture.cancel(true);
+ }
+ }
+
+ @VisibleForTesting
+ boolean isLclRetrieverActive() {
+ return lclizationRetrieverFuture != null &&
+ !lclizationRetrieverFuture.isCancelled()
+ && !lclizationRetrieverFuture.isDone();
+ }
+
public String getHostname() {
return getCompInstanceName() + getComponent().getHostnameSuffix();
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
index 153ab46..1574d6d 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
@@ -34,8 +34,12 @@ import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static
org.apache.hadoop.yarn.service.provider.ProviderService.FAILED_LAUNCH_PARAMS;
public class ContainerLaunchService extends AbstractService{
@@ -65,24 +69,27 @@ public class ContainerLaunchService extends AbstractService{
super.serviceStop();
}
- public void launchCompInstance(Service service,
+ public Future<ProviderService.ResolvedLaunchParams> launchCompInstance(
+ Service service,
ComponentInstance instance, Container container,
ComponentLaunchContext componentLaunchContext) {
ContainerLauncher launcher =
new ContainerLauncher(service, instance, container,
componentLaunchContext, false);
- executorService.execute(launcher);
+ return executorService.submit(launcher);
}
- public void reInitCompInstance(Service service,
+ public Future<ProviderService.ResolvedLaunchParams> reInitCompInstance(
+ Service service,
ComponentInstance instance, Container container,
ComponentLaunchContext componentLaunchContext) {
ContainerLauncher reInitializer = new ContainerLauncher(service, instance,
container, componentLaunchContext, true);
- executorService.execute(reInitializer);
+ return executorService.submit(reInitializer);
}
- private class ContainerLauncher implements Runnable {
+ private class ContainerLauncher implements
+ Callable<ProviderService.ResolvedLaunchParams> {
public final Container container;
public final Service service;
public ComponentInstance instance;
@@ -99,12 +106,14 @@ public class ContainerLaunchService extends
AbstractService{
this.reInit = reInit;
}
- @Override public void run() {
+ @Override
+ public ProviderService.ResolvedLaunchParams call() {
ProviderService provider = ProviderFactory.getProviderService(
componentLaunchContext.getArtifact());
AbstractLauncher launcher = new AbstractLauncher(context);
+ ProviderService.ResolvedLaunchParams resolvedParams = null;
try {
- provider.buildContainerLaunchContext(launcher, service,
+ resolvedParams = provider.buildContainerLaunchContext(launcher,
service,
instance, fs, getConfig(), container, componentLaunchContext);
if (!reInit) {
LOG.info("launching container {}", container.getId());
@@ -126,6 +135,11 @@ public class ContainerLaunchService extends
AbstractService{
.setInstance(instance).setContainerId(container.getId());
context.scheduler.getDispatcher().getEventHandler().handle(event);
}
+ if (resolvedParams != null) {
+ return resolvedParams;
+ } else {
+ return FAILED_LAUNCH_PARAMS;
+ }
}
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java
index 4394e62..52f2a4e 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java
@@ -136,11 +136,13 @@ public abstract class AbstractProviderService implements
ProviderService,
}
}
- public void buildContainerLaunchContext(AbstractLauncher launcher,
+ public ResolvedLaunchParams buildContainerLaunchContext(
+ AbstractLauncher launcher,
Service service, ComponentInstance instance,
SliderFileSystem fileSystem, Configuration yarnConf, Container container,
ContainerLaunchService.ComponentLaunchContext compLaunchContext)
throws IOException, SliderException {
+ ResolvedLaunchParams resolved = new ResolvedLaunchParams();
processArtifact(launcher, instance, fileSystem, service,
compLaunchContext);
ServiceContext context =
@@ -154,13 +156,13 @@ public abstract class AbstractProviderService implements
ProviderService,
fileSystem, yarnConf, container, compLaunchContext,
tokensForSubstitution);
- // create config file on hdfs and add local resource
+ // create config file on hdfs and addResolvedRsrcPath local resource
ProviderUtils.createConfigFileAndAddLocalResource(launcher, fileSystem,
- compLaunchContext, tokensForSubstitution, instance, context);
+ compLaunchContext, tokensForSubstitution, instance, context, resolved);
// handles static files (like normal file / archive file) for localization.
ProviderUtils.handleStaticFilesForLocalization(launcher, fileSystem,
- compLaunchContext);
+ compLaunchContext, resolved);
// replace launch command with token specific information
buildContainerLaunchCommand(launcher, service, instance, fileSystem,
@@ -168,5 +170,7 @@ public abstract class AbstractProviderService implements
ProviderService,
// Setup container retry settings
buildContainerRetry(launcher, yarnConf, compLaunchContext, instance);
+
+ return resolved;
}
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java
index fe765de..96b24d2 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.service.provider;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.Service;
@@ -28,16 +29,53 @@ import
org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
public interface ProviderService {
/**
* Set up the entire container launch context
*/
- void buildContainerLaunchContext(AbstractLauncher containerLauncher,
+ ResolvedLaunchParams buildContainerLaunchContext(
+ AbstractLauncher containerLauncher,
Service service, ComponentInstance instance,
SliderFileSystem sliderFileSystem, Configuration yarnConf,
Container container,
ContainerLaunchService.ComponentLaunchContext componentLaunchContext)
throws IOException, SliderException;
+
+ /**
+ * This holds any information that is resolved during building the launch
+ * context for a container.
+ * <p>
+ * Right now it contains a mapping of resource keys to destination files
+ * for resources that need to be localized.
+ */
+ class ResolvedLaunchParams {
+ private Map<String, String> resolvedRsrcPaths = new HashMap<>();
+
+ void addResolvedRsrcPath(String resourceKey, String destFile) {
+ Preconditions.checkNotNull(destFile, "dest file cannot be null");
+ Preconditions.checkNotNull(resourceKey,
+ "local resource cannot be null");
+ resolvedRsrcPaths.put(resourceKey, destFile);
+ }
+
+ public Map<String, String> getResolvedRsrcPaths() {
+ return this.resolvedRsrcPaths;
+ }
+
+ public boolean didLaunchFail() {
+ return false;
+ }
+ }
+
+ ResolvedLaunchParams FAILED_LAUNCH_PARAMS = new ResolvedLaunchParams() {
+ @Override
+ public boolean didLaunchFail() {
+ return true;
+ }
+ };
+
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
index c12c340..88883f7 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
@@ -180,7 +180,10 @@ public class ProviderUtils implements YarnServiceConstants
{
AbstractLauncher launcher, SliderFileSystem fs,
ContainerLaunchService.ComponentLaunchContext compLaunchContext,
Map<String, String> tokensForSubstitution, ComponentInstance instance,
- ServiceContext context) throws IOException {
+ ServiceContext context, ProviderService.ResolvedLaunchParams
+ resolvedParams)
+ throws IOException {
+
Path compInstanceDir = initCompInstanceDir(fs, compLaunchContext,
instance);
if (!fs.getFileSystem().exists(compInstanceDir)) {
log.info("{} version {} : Creating dir on hdfs: {}",
@@ -254,13 +257,15 @@ public class ProviderUtils implements
YarnServiceConstants {
fs.createAmResource(remoteFile, LocalResourceType.FILE);
Path destFile = new Path(configFile.getDestFile());
String symlink = APP_CONF_DIR + "/" + fileName;
- addLocalResource(launcher, symlink, configResource, destFile);
+ addLocalResource(launcher, symlink, configResource, destFile,
+ resolvedParams);
}
}
public static synchronized void handleStaticFilesForLocalization(
AbstractLauncher launcher, SliderFileSystem fs, ContainerLaunchService
- .ComponentLaunchContext componentLaunchCtx)
+ .ComponentLaunchContext componentLaunchCtx,
+ ProviderService.ResolvedLaunchParams resolvedParams)
throws IOException {
for (ConfigFile staticFile :
componentLaunchCtx.getConfiguration().getFiles()) {
@@ -298,13 +303,14 @@ public class ProviderUtils implements
YarnServiceConstants {
.isEmpty()) {
destFile = new Path(staticFile.getDestFile());
}
-
- addLocalResource(launcher, destFile.getName(), localResource, destFile);
+ addLocalResource(launcher, destFile.getName(), localResource, destFile,
+ resolvedParams);
}
}
private static void addLocalResource(AbstractLauncher launcher,
- String symlink, LocalResource localResource, Path destFile) {
+ String symlink, LocalResource localResource, Path destFile,
+ ProviderService.ResolvedLaunchParams resolvedParams) {
if (destFile.isAbsolute()) {
launcher.addLocalResource(symlink, localResource, destFile.toString());
log.info("Added file for localization: "+ symlink +" -> " +
@@ -315,6 +321,7 @@ public class ProviderUtils implements YarnServiceConstants {
log.info("Added file for localization: " + symlink+ " -> " +
localResource.getResource().getFile());
}
+ resolvedParams.addResolvedRsrcPath(symlink, destFile.toString());
}
// Static file is files uploaded by users before launch the service. Which
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java
index b685f4b..0245cd6 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.service;
+import com.google.common.util.concurrent.Futures;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -35,16 +36,22 @@ import
org.apache.hadoop.yarn.service.component.ComponentEventType;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import
org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import
org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
+import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
+import org.apache.hadoop.yarn.service.exceptions.SliderException;
+import org.apache.hadoop.yarn.service.provider.ProviderService;
+import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
+import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Future;
import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -112,12 +119,38 @@ public class MockRunningServiceContext extends
ServiceContext {
this.scheduler.init(fsWatcher.getConf());
-
- doNothing().when(mockLaunchService).
- reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject());
+ when(mockLaunchService.launchCompInstance(anyObject(), anyObject(),
+ anyObject(), anyObject())).thenAnswer(
+ (Answer<Future<ProviderService.ResolvedLaunchParams>>)
+ this::launchAndReinitHelper);
+
+ when(mockLaunchService.reInitCompInstance(anyObject(), anyObject(),
+ anyObject(), anyObject())).thenAnswer((
+ Answer<Future<ProviderService.ResolvedLaunchParams>>)
+ this::launchAndReinitHelper);
stabilizeComponents(this);
}
+ private Future<ProviderService.ResolvedLaunchParams> launchAndReinitHelper(
+ InvocationOnMock invocation) throws IOException, SliderException {
+ AbstractLauncher launcher = new AbstractLauncher(
+ scheduler.getContext());
+ ComponentInstance instance = (ComponentInstance)
+ invocation.getArguments()[1];
+ Container container = (Container) invocation.getArguments()[2];
+ ContainerLaunchService.ComponentLaunchContext clc =
+ (ContainerLaunchService.ComponentLaunchContext)
+ invocation.getArguments()[3];
+
+ ProviderService.ResolvedLaunchParams resolvedParams =
+ new ProviderService.ResolvedLaunchParams();
+ ProviderUtils.createConfigFileAndAddLocalResource(launcher, fs, clc,
+ new HashMap<>(), instance, scheduler.getContext(), resolvedParams);
+ ProviderUtils.handleStaticFilesForLocalization(launcher, fs, clc,
+ resolvedParams);
+ return Futures.immediateFuture(resolvedParams);
+ }
+
private void stabilizeComponents(ServiceContext context) {
ApplicationId appId = ApplicationId.fromString(context.service.getId());
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java
index 729287c..ae59c90 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java
@@ -453,6 +453,6 @@ public class MockServiceAM extends ServiceMaster {
public void waitForContainerToRelease(ContainerId containerId)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> releasedContainers.contains(containerId),
- 1000, 9990000);
+ 1000, 30000);
}
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
index 6207d63..02cf601 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
@@ -62,6 +62,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
+import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
@@ -403,6 +404,7 @@ public class ServiceTestUtils {
description.getClassName(), description.getMethodName());
conf.set(YARN_SERVICE_BASE_PATH, serviceBasePath.toString());
try {
+ Files.createDirectories(serviceBasePath);
fs = new SliderFileSystem(conf);
fs.setAppDir(new Path(serviceBasePath.toString()));
} catch (IOException e) {
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
index 51c27e8..bbcbee2 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
@@ -395,7 +395,7 @@ public class TestServiceAM extends ServiceTestUtils{
// Test to verify that the containers are released and the
// component instance is added to the pending queue when building the launch
// context fails.
- @Test(timeout = 9990000)
+ @Test(timeout = 30000)
public void testContainersReleasedWhenPreLaunchFails()
throws Exception {
ApplicationId applicationId = ApplicationId.newInstance(
@@ -420,6 +420,11 @@ public class TestServiceAM extends ServiceTestUtils{
// allocate a container
am.feedContainerToComp(exampleApp, containerId, "compa");
am.waitForContainerToRelease(containerId);
+ ComponentInstance compAinst0 = am.getCompInstance(compA.getName(),
+ "compa-0");
+ GenericTestUtils.waitFor(() ->
+ am.getComponent(compA.getName()).getPendingInstances()
+ .contains(compAinst0), 2000, 30000);
Assert.assertEquals(1,
am.getComponent("compa").getPendingInstances().size());
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
index c3b1602..f6ead01 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
@@ -18,16 +18,22 @@
package org.apache.hadoop.yarn.service.component.instance;
+import com.google.common.collect.Lists;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalizationState;
+import org.apache.hadoop.yarn.api.records.LocalizationStatus;
+import org.apache.hadoop.yarn.service.MockRunningServiceContext;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.ServiceScheduler;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.api.records.Configuration;
+import org.apache.hadoop.yarn.service.TestServiceManager;
+import org.apache.hadoop.yarn.service.api.records.ConfigFile;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Service;
@@ -41,6 +47,9 @@ import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -261,6 +270,58 @@ public class TestComponentInstance {
validateCancelWhileUpgrading(false, false);
}
+ @Test
+ public void testUpdateLocalizationStatuses() throws Exception {
+ Service def = TestServiceManager.createBaseDef(
+ "testUpdateLocalizationStatuses");
+
+ String file1 = rule.getServiceBasePath().toString() + "/file1";
+ Files.write(Paths.get(file1), "test file".getBytes(),
+ StandardOpenOption.CREATE_NEW);
+
+ org.apache.hadoop.yarn.service.api.records.Component compDef =
+ def.getComponents().iterator().next();
+ ConfigFile configFile1 = new ConfigFile();
+ configFile1.setType(ConfigFile.TypeEnum.STATIC);
+ configFile1.setSrcFile(file1);
+ compDef.setConfiguration(new Configuration().files(
+ Lists.newArrayList(configFile1)));
+
+ ServiceContext context = new MockRunningServiceContext(rule, def);
+ Component component = context.scheduler.getAllComponents().get(
+ compDef.getName());
+ ComponentInstance instance =
component.getAllComponentInstances().iterator()
+ .next();
+ LocalizationStatus status = LocalizationStatus.newInstance("file1",
+ LocalizationState.PENDING);
+
+ instance.updateLocalizationStatuses(Lists.newArrayList(status));
+ Assert.assertTrue("retriever should still be active",
+ instance.isLclRetrieverActive());
+
+ Container container = instance.getContainerSpec();
+ Assert.assertTrue(container.getLocalizationStatuses() != null);
+ Assert.assertEquals("dest file",
+ container.getLocalizationStatuses().get(0).getDestFile(),
+ status.getResourceKey());
+ Assert.assertEquals("state",
+ container.getLocalizationStatuses().get(0).getState(),
+ status.getLocalizationState());
+
+ status = LocalizationStatus.newInstance("file1",
+ LocalizationState.COMPLETED);
+ instance.updateLocalizationStatuses(Lists.newArrayList(status));
+ Assert.assertTrue("retriever should not be active",
+ !instance.isLclRetrieverActive());
+ Assert.assertTrue(container.getLocalizationStatuses() != null);
+ Assert.assertEquals("dest file",
+ container.getLocalizationStatuses().get(0).getDestFile(),
+ status.getResourceKey());
+ Assert.assertEquals("state",
+ container.getLocalizationStatuses().get(0).getState(),
+ status.getLocalizationState());
+ }
+
private void validateCancelWhileUpgrading(boolean upgradeSuccessful,
boolean cancelUpgradeSuccessful)
throws Exception {
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/provider/TestProviderUtils.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/provider/TestProviderUtils.java
index 5d794d2..ff1fb7f 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/provider/TestProviderUtils.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/provider/TestProviderUtils.java
@@ -30,6 +30,7 @@ import
org.apache.hadoop.yarn.service.api.records.Configuration;
import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
+import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -152,13 +153,19 @@ public class TestProviderUtils {
configFileList.add(new ConfigFile().srcFile("hdfs://default/sourceFile4")
.type(ConfigFile.TypeEnum.STATIC));
+ ProviderService.ResolvedLaunchParams resolved =
+ new ProviderService.ResolvedLaunchParams();
ProviderUtils.handleStaticFilesForLocalization(launcher, sfs,
- compLaunchCtx);
+ compLaunchCtx, resolved);
Mockito.verify(launcher).addLocalResource(Mockito.eq("destFile1"),
any(LocalResource.class));
Mockito.verify(launcher).addLocalResource(
Mockito.eq("destFile_2"), any(LocalResource.class));
Mockito.verify(launcher).addLocalResource(
Mockito.eq("sourceFile4"), any(LocalResource.class));
+
+ Assert.assertEquals(3, resolved.getResolvedRsrcPaths().size());
+ Assert.assertEquals(resolved.getResolvedRsrcPaths().get("destFile1"),
+ "destFile1");
}
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
index 17168f7..a8b64cc 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.client.api;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -32,6 +33,8 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalizationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -244,7 +247,38 @@ public abstract class NMClient extends AbstractService {
* @return NodeId of the container on which it is running.
*/
public NodeId getNodeIdOfStartedContainer(ContainerId containerId) {
+
return null;
}
+ /**
+ * Localize resources for a container.
+ * @param containerId the ID of the container
+ * @param nodeId node Id of the container
+ * @param localResources resources to localize
+ */
+ @InterfaceStability.Unstable
+ public void localize(ContainerId containerId, NodeId nodeId,
+ Map<String, LocalResource> localResources) throws YarnException,
+ IOException {
+ // do nothing.
+ }
+
+ /**
+ * Get the localization statuses of a container.
+ *
+ * @param containerId the Id of the container
+ * @param nodeId node Id of the container
+ *
+ * @return the status of a container.
+ *
+ * @throws YarnException YarnException.
+ * @throws IOException IOException.
+ */
+ @InterfaceStability.Unstable
+ public List<LocalizationStatus> getLocalizationStatuses(
+ ContainerId containerId, NodeId nodeId) throws YarnException,
+ IOException {
+ return null;
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
index 017756e..96a93c2 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.collect.Lists;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@@ -36,7 +37,10 @@ import
org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -48,6 +52,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalizationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.NMClient;
@@ -464,4 +470,54 @@ public class NMClientImpl extends NMClient {
return null;
}
+ @Override
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ public void localize(ContainerId containerId, NodeId nodeId,
+ Map<String, LocalResource> localResources) throws YarnException,
+ IOException {
+ ContainerManagementProtocolProxyData proxy;
+ StartedContainer container = startedContainers.get(containerId);
+ if (container != null) {
+ synchronized (container) {
+ proxy = cmProxy.getProxy(container.getNodeId().toString(),
containerId);
+ try {
+ proxy.getContainerManagementProtocol().localize(
+ ResourceLocalizationRequest.newInstance(containerId,
+ localResources));
+ } finally {
+ if (proxy != null) {
+ cmProxy.mayBeCloseProxy(proxy);
+ }
+ }
+ }
+ } else {
+ throw new YarnException("Unknown container [" + containerId + "]");
+ }
+ }
+
+ @Override
+ public List<LocalizationStatus> getLocalizationStatuses(
+ ContainerId containerId, NodeId nodeId) throws YarnException,
+ IOException {
+
+ ContainerManagementProtocolProxyData proxy = null;
+ List<ContainerId> containerIds = Lists.newArrayList(containerId);
+ try {
+ proxy = cmProxy.getProxy(nodeId.toString(), containerId);
+ GetLocalizationStatusesResponse response =
+ proxy.getContainerManagementProtocol().getLocalizationStatuses(
+ GetLocalizationStatusesRequest.newInstance(containerIds));
+ if (response.getFailedRequests() != null
+ && response.getFailedRequests().containsKey(containerId)) {
+ Throwable t =
+ response.getFailedRequests().get(containerId).deSerialize();
+ parseAndThrowException(t);
+ }
+ return response.getLocalizationStatuses().get(containerId);
+ } finally {
+ if (proxy != null) {
+ cmProxy.mayBeCloseProxy(proxy);
+ }
+ }
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java
index 7e471f3..af42021 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java
@@ -32,6 +32,8 @@ import
org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
@@ -52,6 +54,8 @@ import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ContainerUpdateRespons
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl;
+import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetLocalizationStatusesRequestPBImpl;
+import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetLocalizationStatusesResponsePBImpl;
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReInitializeContainerRequestPBImpl;
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReInitializeContainerResponsePBImpl;
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ResourceLocalizationRequestPBImpl;
@@ -74,6 +78,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import
org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateRequestProto;
import
org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto;
+import
org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLocalizationStatusesRequestProto;
import
org.apache.hadoop.yarn.proto.YarnServiceProtos.ResourceLocalizationRequestProto;
import
org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import
org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto;
@@ -280,4 +285,21 @@ public class ContainerManagementProtocolPBClientImpl
implements ContainerManagem
return null;
}
}
+
+ @Override
+ public GetLocalizationStatusesResponse getLocalizationStatuses(
+ GetLocalizationStatusesRequest request)
+ throws YarnException, IOException {
+ GetLocalizationStatusesRequestProto requestProto =
+ ((GetLocalizationStatusesRequestPBImpl) request).getProto();
+ try {
+ return new GetLocalizationStatusesResponsePBImpl(
+ proxy.getLocalizationStatuses(null, requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
+
+
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java
index 68e1645..ad8a756 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java
@@ -26,6 +26,7 @@ import
org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
@@ -38,6 +39,8 @@ import
org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CommitResponsePBImpl;
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ContainerUpdateRequestPBImpl;
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ContainerUpdateResponsePBImpl;
+import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetLocalizationStatusesRequestPBImpl;
+import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetLocalizationStatusesResponsePBImpl;
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl;
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
@@ -65,6 +68,8 @@ import
org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResource
import
org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceResponseProto;
import
org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto;
import
org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesResponseProto;
+import
org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLocalizationStatusesRequestProto;
+import
org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLocalizationStatusesResponseProto;
import
org.apache.hadoop.yarn.proto.YarnServiceProtos.ReInitializeContainerRequestProto;
import
org.apache.hadoop.yarn.proto.YarnServiceProtos.ReInitializeContainerResponseProto;
import
org.apache.hadoop.yarn.proto.YarnServiceProtos.ResourceLocalizationRequestProto;
@@ -264,4 +269,19 @@ public class ContainerManagementProtocolPBServiceImpl
implements ContainerManage
throw new ServiceException(e);
}
}
+
+ @Override
+ public GetLocalizationStatusesResponseProto getLocalizationStatuses(
+ RpcController controller, GetLocalizationStatusesRequestProto request)
+ throws ServiceException {
+ GetLocalizationStatusesRequestPBImpl lclReq =
+ new GetLocalizationStatusesRequestPBImpl(request);
+ try {
+ GetLocalizationStatusesResponse response = real.getLocalizationStatuses(
+ lclReq);
+ return ((GetLocalizationStatusesResponsePBImpl)response).getProto();
+ } catch (YarnException | IOException e) {
+ throw new ServiceException(e);
+ }
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetLocalizationStatusesRequestPBImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetLocalizationStatusesRequestPBImpl.java
new file mode 100644
index 0000000..783098f
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetLocalizationStatusesRequestPBImpl.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import
org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLocalizationStatusesRequestProto;
+import
org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLocalizationStatusesRequestProtoOrBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * PB Impl of {@link GetLocalizationStatusesRequest}.
+ */
+@Private
+@Unstable
+public class GetLocalizationStatusesRequestPBImpl extends
+ GetLocalizationStatusesRequest {
+ private GetLocalizationStatusesRequestProto proto =
+ GetLocalizationStatusesRequestProto.getDefaultInstance();
+ private GetLocalizationStatusesRequestProto.Builder builder;
+ private boolean viaProto = false;
+
+ private List<ContainerId> containerIds;
+
+ public GetLocalizationStatusesRequestPBImpl() {
+ builder = GetLocalizationStatusesRequestProto.newBuilder();
+ }
+
+ public GetLocalizationStatusesRequestPBImpl(
+ GetLocalizationStatusesRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public GetLocalizationStatusesRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.containerIds != null) {
+ addLocalContainerIdsToProto();
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto) {
+ maybeInitBuilder();
+ }
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = GetLocalizationStatusesRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private void addLocalContainerIdsToProto() {
+ maybeInitBuilder();
+ builder.clearContainerId();
+ if (this.containerIds == null) {
+ return;
+ }
+ List<ContainerIdProto> protoList = new ArrayList<ContainerIdProto>();
+ for (ContainerId id : containerIds) {
+ protoList.add(convertToProtoFormat(id));
+ }
+ builder.addAllContainerId(protoList);
+ }
+
+ private void initLocalContainerIds() {
+ if (this.containerIds != null) {
+ return;
+ }
+ GetLocalizationStatusesRequestProtoOrBuilder p = viaProto ? proto :
builder;
+ List<ContainerIdProto> toAdd = p.getContainerIdList();
+ this.containerIds = new ArrayList<>();
+ for (ContainerIdProto id : toAdd) {
+ this.containerIds.add(convertFromProtoFormat(id));
+ }
+ }
+
+ @Override
+ public List<ContainerId> getContainerIds() {
+ initLocalContainerIds();
+ return this.containerIds;
+ }
+
+ @Override
+ public void setContainerIds(List<ContainerId> containerIds) {
+ maybeInitBuilder();
+ if (containerIds == null) {
+ builder.clearContainerId();
+ }
+ this.containerIds = containerIds;
+ }
+
+ private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
+ return new ContainerIdPBImpl(p);
+ }
+
+ private ContainerIdProto convertToProtoFormat(ContainerId t) {
+ return ((ContainerIdPBImpl) t).getProto();
+ }
+
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetLocalizationStatusesResponsePBImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetLocalizationStatusesResponsePBImpl.java
new file mode 100644
index 0000000..f42fa98
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetLocalizationStatusesResponsePBImpl.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalizationStatus;
+import org.apache.hadoop.yarn.api.records.SerializedException;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalizationStatusPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.SerializedExceptionProto;
+import
org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerExceptionMapProto;
+import
org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerLocalizationStatusesProto;
+import
org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLocalizationStatusesResponseProto;
+import
org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLocalizationStatusesResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.LocalizationStatusProto;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * PB Impl of {@link GetLocalizationStatusesResponse}.
+ */
+@Private
+@Unstable
+public class GetLocalizationStatusesResponsePBImpl extends
+ GetLocalizationStatusesResponse {
+ private GetLocalizationStatusesResponseProto proto =
+ GetLocalizationStatusesResponseProto.getDefaultInstance();
+ private GetLocalizationStatusesResponseProto.Builder builder;
+ private boolean viaProto = false;
+
+ private Map<ContainerId, List<LocalizationStatus>> localizationStatuses;
+ private Map<ContainerId, SerializedException> failedRequests;
+
+ public GetLocalizationStatusesResponsePBImpl() {
+ builder = GetLocalizationStatusesResponseProto.newBuilder();
+ }
+
+ public GetLocalizationStatusesResponsePBImpl(
+ GetLocalizationStatusesResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public GetLocalizationStatusesResponseProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.localizationStatuses != null) {
+ addLocalStatusesToProto();
+ }
+ if (this.failedRequests != null) {
+ addFailedRequestsToProto();
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto) {
+ maybeInitBuilder();
+ }
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = GetLocalizationStatusesResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private void addLocalStatusesToProto() {
+ maybeInitBuilder();
+ builder.clearCntnLocalizationStatuses();
+ if (this.localizationStatuses == null) {
+ return;
+ }
+ List<ContainerLocalizationStatusesProto> protos =
+ new ArrayList<ContainerLocalizationStatusesProto>();
+
+ this.localizationStatuses.forEach((containerId, statuses) -> {
+ if (statuses != null && !statuses.isEmpty()) {
+ ContainerLocalizationStatusesProto.Builder clProtoBuilder =
+ ContainerLocalizationStatusesProto.newBuilder();
+ statuses.forEach(status -> {
+ clProtoBuilder.addLocalizationStatuses(convertToProtoFormat(status));
+ });
+ clProtoBuilder.setContainerId(convertToProtoFormat(containerId));
+ protos.add(clProtoBuilder.build());
+ }
+ });
+ builder.addAllCntnLocalizationStatuses(protos);
+ }
+
+ private void addFailedRequestsToProto() {
+ maybeInitBuilder();
+ builder.clearFailedRequests();
+ if (this.failedRequests == null) {
+ return;
+ }
+ List<ContainerExceptionMapProto> protoList =
+ new ArrayList<ContainerExceptionMapProto>();
+ for (Map.Entry<ContainerId, SerializedException> entry :
this.failedRequests
+ .entrySet()) {
+ protoList.add(ContainerExceptionMapProto.newBuilder()
+ .setContainerId(convertToProtoFormat(entry.getKey()))
+ .setException(convertToProtoFormat(entry.getValue())).build());
+ }
+ builder.addAllFailedRequests(protoList);
+ }
+
+
+ private void initLocalContainerStatuses() {
+ if (localizationStatuses != null) {
+ return;
+ }
+ GetLocalizationStatusesResponseProtoOrBuilder p = viaProto ? proto :
+ builder;
+ List<ContainerLocalizationStatusesProto> protoList =
+ p.getCntnLocalizationStatusesList();
+ localizationStatuses = new HashMap<>();
+
+ for (ContainerLocalizationStatusesProto clProto : protoList) {
+ List<LocalizationStatusProto> lsProtos =
+ clProto.getLocalizationStatusesList();
+
+ List<LocalizationStatus> statusesPerCntn = new ArrayList<>();
+ lsProtos.forEach(lsProto -> {
+ statusesPerCntn.add(convertFromProtoFormat(lsProto));
+ });
+
+
localizationStatuses.put(convertFromProtoFormat(clProto.getContainerId()),
+ statusesPerCntn);
+ }
+ }
+
+ private void initFailedRequests() {
+ if (this.failedRequests != null) {
+ return;
+ }
+ GetLocalizationStatusesResponseProtoOrBuilder p = viaProto ? proto :
+ builder;
+ List<ContainerExceptionMapProto> protoList = p.getFailedRequestsList();
+ this.failedRequests = new HashMap<>();
+ for (ContainerExceptionMapProto ce : protoList) {
+ this.failedRequests.put(convertFromProtoFormat(ce.getContainerId()),
+ convertFromProtoFormat(ce.getException()));
+ }
+ }
+
+ @Override
+ public Map<ContainerId, List<LocalizationStatus>> getLocalizationStatuses() {
+ initLocalContainerStatuses();
+ return this.localizationStatuses;
+ }
+
+ @Override
+ public void setLocalizationStatuses(
+ Map<ContainerId, List<LocalizationStatus>> statuses) {
+ maybeInitBuilder();
+ if (statuses == null) {
+ builder.clearCntnLocalizationStatuses();
+ }
+ this.localizationStatuses = statuses;
+ }
+
+ @Override
+ public Map<ContainerId, SerializedException> getFailedRequests() {
+ initFailedRequests();
+ return this.failedRequests;
+ }
+
+ @Override
+ public void setFailedRequests(
+ Map<ContainerId, SerializedException> failedRequests) {
+ maybeInitBuilder();
+ if (failedRequests == null) {
+ builder.clearFailedRequests();
+ }
+ this.failedRequests = failedRequests;
+ }
+
+ private LocalizationStatusPBImpl convertFromProtoFormat(
+ LocalizationStatusProto p) {
+ return new LocalizationStatusPBImpl(p);
+ }
+
+ private LocalizationStatusProto convertToProtoFormat(
+ LocalizationStatus t) {
+ return ((LocalizationStatusPBImpl) t).getProto();
+ }
+
+
+ private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
+ return new ContainerIdPBImpl(p);
+ }
+
+ private ContainerIdProto convertToProtoFormat(ContainerId t) {
+ return ((ContainerIdPBImpl) t).getProto();
+ }
+
+ private SerializedExceptionPBImpl convertFromProtoFormat(
+ SerializedExceptionProto p) {
+ return new SerializedExceptionPBImpl(p);
+ }
+
+ private SerializedExceptionProto convertToProtoFormat(SerializedException t)
{
+ return ((SerializedExceptionPBImpl) t).getProto();
+ }
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalizationStatusPBImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalizationStatusPBImpl.java
new file mode 100644
index 0000000..3e7a9fe
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalizationStatusPBImpl.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.LocalizationState;
+import org.apache.hadoop.yarn.api.records.LocalizationStatus;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.LocalizationStateProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.LocalizationStatusProto;
+import
org.apache.hadoop.yarn.proto.YarnServiceProtos.LocalizationStatusProtoOrBuilder;
+
+/**
+ * PB Impl of {@link LocalizationStatus}.
+ */
+@Private
+@Unstable
+public class LocalizationStatusPBImpl extends LocalizationStatus {
+ private LocalizationStatusProto proto =
+ LocalizationStatusProto.getDefaultInstance();
+ private LocalizationStatusProto.Builder builder;
+ private boolean viaProto = false;
+
+ private String resourceKey;
+ private LocalizationState localizationState;
+ private String diagnostics;
+
+ public LocalizationStatusPBImpl() {
+ builder = LocalizationStatusProto.newBuilder();
+ }
+
+ public LocalizationStatusPBImpl(LocalizationStatusProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public synchronized LocalizationStatusProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("LocalizationStatus: [");
+ sb.append("ResourceKey: ").append(getResourceKey()).append(", ");
+ sb.append("LocalizationState: ").append(getLocalizationState())
+ .append(", ");
+ sb.append("Diagnostics: ").append(getDiagnostics()).append(", ");
+ sb.append("]");
+ return sb.toString();
+ }
+
+ private void mergeLocalToBuilder() {
+ if (resourceKey != null) {
+ builder.setResourceKey(this.resourceKey);
+ }
+ if (localizationState != null) {
+ builder.setLocalizationState(convertToProtoFormat(localizationState));
+ }
+ if (diagnostics != null) {
+ builder.setDiagnostics(diagnostics);
+ }
+ }
+
+ private synchronized void mergeLocalToProto() {
+ if (viaProto) {
+ maybeInitBuilder();
+ }
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private synchronized void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = LocalizationStatusProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public synchronized String getResourceKey() {
+ LocalizationStatusProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.resourceKey != null) {
+ return this.resourceKey;
+ }
+ if (!p.hasResourceKey()) {
+ return null;
+ }
+ this.resourceKey = p.getResourceKey();
+ return this.resourceKey;
+ }
+
+ @Override
+ public synchronized void setResourceKey(String resourceKey) {
+ maybeInitBuilder();
+ if (resourceKey == null) {
+ builder.clearResourceKey();
+ }
+ this.resourceKey = resourceKey;
+ }
+
+ @Override
+ public synchronized LocalizationState getLocalizationState() {
+ LocalizationStatusProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.localizationState != null) {
+ return this.localizationState;
+ }
+ if (!p.hasLocalizationState()) {
+ return null;
+ }
+ this.localizationState = convertFromProtoFormat(p.getLocalizationState());
+ return localizationState;
+ }
+
+ @Override
+ public synchronized void setLocalizationState(
+ LocalizationState localizationState) {
+ maybeInitBuilder();
+ if (localizationState == null) {
+ builder.clearLocalizationState();
+ }
+ this.localizationState = localizationState;
+ }
+
+ @Override
+ public synchronized String getDiagnostics() {
+ LocalizationStatusProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.diagnostics != null) {
+ return this.diagnostics;
+ }
+ if (!p.hasDiagnostics()) {
+ return null;
+ }
+ this.diagnostics = p.getDiagnostics();
+ return diagnostics;
+ }
+
+ @Override
+ public synchronized void setDiagnostics(String diagnostics) {
+ maybeInitBuilder();
+ if (diagnostics == null) {
+ builder.clearDiagnostics();
+ }
+ this.diagnostics = diagnostics;
+ }
+
+ private LocalizationStateProto convertToProtoFormat(LocalizationState e) {
+ return ProtoUtils.convertToProtoFormat(e);
+ }
+
+ private LocalizationState convertFromProtoFormat(LocalizationStateProto e) {
+ return ProtoUtils.convertFromProtoFormat(e);
+ }
+
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index f175cf3..3f360d7 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.LocalizationState;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -92,12 +93,16 @@ import
org.apache.hadoop.yarn.proto.YarnProtos.ResourceTypesProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeUpdateTypeProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateTypeProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.LocalizationStateProto;
import org.apache.hadoop.yarn.server.api.ContainerType;
import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import com.google.protobuf.ByteString;
+/**
+ * Utils to convert enum protos to corresponding java enums and vice versa.
+ */
@Private
@Unstable
public class ProtoUtils {
@@ -596,6 +601,21 @@ public class ProtoUtils {
public static ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl) t).getProto();
}
+
+ //Localization State
+ private final static String LOCALIZATION_STATE_PREFIX = "L_";
+ public static LocalizationStateProto convertToProtoFormat(
+ LocalizationState e) {
+ return LocalizationStateProto.valueOf(LOCALIZATION_STATE_PREFIX +
e.name());
+ }
+
+ public static LocalizationState convertFromProtoFormat(
+ LocalizationStateProto e) {
+ return LocalizationState.valueOf(e.name()
+ .replace(LOCALIZATION_STATE_PREFIX, ""));
+ }
+
}
+
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
index dfe7534..34e2198 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
@@ -245,5 +247,12 @@ public class TestContainerLaunchRPC {
request) throws YarnException, IOException {
return null;
}
+
+ @Override
+ public GetLocalizationStatusesResponse getLocalizationStatuses(
+ GetLocalizationStatusesRequest request) throws YarnException,
+ IOException {
+ return null;
+ }
}
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java
index 6e97284..1690b81 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java
@@ -31,6 +31,8 @@ import
org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
@@ -227,5 +229,12 @@ public class TestContainerResourceIncreaseRPC {
throws YarnException, IOException {
return null;
}
+
+ @Override
+ public GetLocalizationStatusesResponse getLocalizationStatuses(
+ GetLocalizationStatusesRequest request)
+ throws YarnException, IOException {
+ return null;
+ }
}
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
index 82dfaea..dedabc0 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
@@ -37,6 +37,8 @@ import
org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
@@ -420,6 +422,13 @@ public class TestRPC {
request) throws YarnException, IOException {
return null;
}
+
+ @Override
+ public GetLocalizationStatusesResponse getLocalizationStatuses(
+ GetLocalizationStatusesRequest request) throws YarnException,
+ IOException {
+ return null;
+ }
}
public static ContainerTokenIdentifier newContainerTokenIdentifier(
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 256ae87..2ca63ae 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -20,6 +20,9 @@ package
org.apache.hadoop.yarn.server.nodemanager.containermanager;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse;
+import org.apache.hadoop.yarn.api.records.LocalizationStatus;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
@@ -1963,4 +1966,54 @@ public class ContainerManagerImpl extends
CompositeService implements
dispatcher.getEventHandler().handle(new LogHandlerTokenUpdatedEvent());
}
}
+
+ @Override
+ public GetLocalizationStatusesResponse getLocalizationStatuses(
+ GetLocalizationStatusesRequest request) throws YarnException,
+ IOException {
+ Map<ContainerId, List<LocalizationStatus>> allStatuses = new HashMap<>();
+ Map<ContainerId, SerializedException> failedRequests = new HashMap<>();
+
+ UserGroupInformation remoteUgi = getRemoteUgi();
+ NMTokenIdentifier identifier = selectNMTokenIdentifier(remoteUgi);
+ if (identifier == null) {
+ throw RPCUtil.getRemoteException(INVALID_NMTOKEN_MSG);
+ }
+ String remoteUser = remoteUgi.getUserName();
+ for (ContainerId id : request.getContainerIds()) {
+ try {
+ List<LocalizationStatus> statuses = getLocalizationStatusesInternal(id,
+ identifier, remoteUser);
+ allStatuses.put(id, statuses);
+ } catch (YarnException e) {
+ failedRequests.put(id, SerializedException.newInstance(e));
+ }
+ }
+ return GetLocalizationStatusesResponse.newInstance(allStatuses,
+ failedRequests);
+ }
+
+ private List<LocalizationStatus> getLocalizationStatusesInternal(
+ ContainerId containerID,
+ NMTokenIdentifier nmTokenIdentifier, String remoteUser)
+ throws YarnException {
+ Container container = this.context.getContainers().get(containerID);
+
+ LOG.info("Getting localization status for {}", containerID);
+ authorizeGetAndStopContainerRequest(containerID, container, false,
+ nmTokenIdentifier, remoteUser);
+
+ String containerIDStr = containerID.toString();
+ if (container == null) {
+ if (nodeStatusUpdater.isContainerRecentlyStopped(containerID)) {
+ throw RPCUtil.getRemoteException("Container " + containerIDStr
+ + " was recently stopped on node manager.");
+ } else {
+ throw RPCUtil.getRemoteException("Container " + containerIDStr
+ + " is not handled by this NodeManager");
+ }
+ }
+ return container.getLocalizationStatuses();
+ }
+
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index 0565885..5a457c9 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalizationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -125,4 +126,10 @@ public interface Container extends
EventHandler<ContainerEvent> {
* @return true/false based on container's state
*/
boolean isContainerInFinalStates();
+
+ /**
+ * Get the localization statuses.
+ * @return localization statuses.
+ */
+ List<LocalizationStatus> getLocalizationStatuses();
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 1d6ba2e..cfa6257 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerSubState;
+import org.apache.hadoop.yarn.api.records.LocalizationStatus;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1469,7 +1470,8 @@ public class ContainerImpl implements Container {
ContainerResourceFailedEvent failedEvent =
(ContainerResourceFailedEvent) event;
container.resourceSet
- .resourceLocalizationFailed(failedEvent.getResource());
+ .resourceLocalizationFailed(failedEvent.getResource(),
+ failedEvent.getDiagnosticMessage());
container.addDiagnostics(failedEvent.getDiagnosticMessage());
}
}
@@ -1485,7 +1487,7 @@ public class ContainerImpl implements Container {
ContainerResourceFailedEvent failedEvent =
(ContainerResourceFailedEvent) event;
container.resourceSet.resourceLocalizationFailed(
- failedEvent.getResource());
+ failedEvent.getResource(), failedEvent.getDiagnosticMessage());
container.addDiagnostics("Container aborting re-initialization.. "
+ failedEvent.getDiagnosticMessage());
LOG.error("Container [" + container.getContainerId() + "] Re-init" +
@@ -2288,4 +2290,14 @@ public class ContainerImpl implements Container {
public void setExposedPorts(String ports) {
this.exposedPorts = ports;
}
+
+ @Override
+ public List<LocalizationStatus> getLocalizationStatuses() {
+ this.readLock.lock();
+ try {
+ return resourceSet.getLocalizationStatuses();
+ } finally {
+ this.readLock.unlock();
+ }
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java
index 745f8a8..95a8031 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java
@@ -21,6 +21,8 @@ package
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.LocalizationState;
+import org.apache.hadoop.yarn.api.records.LocalizationStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,8 +50,8 @@ public class ResourceSet {
new ConcurrentHashMap<>();
private Map<LocalResourceRequest, Set<String>> pendingResources =
new ConcurrentHashMap<>();
- private Set<LocalResourceRequest> resourcesFailedToBeLocalized =
- new HashSet<>();
+ private final List<LocalizationStatus> resourcesFailedToBeLocalized =
+ new ArrayList<>();
// resources by visibility (public, private, app)
private final List<LocalResourceRequest> publicRsrcs =
@@ -135,13 +137,20 @@ public class ResourceSet {
}
}
- public void resourceLocalizationFailed(LocalResourceRequest request) {
+ public void resourceLocalizationFailed(LocalResourceRequest request,
+ String diagnostics) {
// Skip null request when localization failed for running container
if (request == null) {
return;
}
- pendingResources.remove(request);
- resourcesFailedToBeLocalized.add(request);
+ Set<String> keys = pendingResources.remove(request);
+ if (keys != null) {
+ synchronized (resourcesFailedToBeLocalized) {
+ keys.forEach(key ->
+
resourcesFailedToBeLocalized.add(LocalizationStatus.newInstance(key,
+ LocalizationState.FAILED, diagnostics)));
+ }
+ }
}
public synchronized Map<LocalResourceVisibility,
@@ -219,4 +228,30 @@ public class ResourceSet {
}
return merged;
}
+
+ /**
+ * Get all the localization statuses.
+ * @return the localization statuses.
+ */
+ public List<LocalizationStatus> getLocalizationStatuses() {
+ List<LocalizationStatus> statuses = new ArrayList<>();
+ localizedResources.forEach((key, path) -> {
+ LocalizationStatus status = LocalizationStatus.newInstance(key,
+ LocalizationState.COMPLETED);
+ statuses.add(status);
+ });
+
+ pendingResources.forEach((lrReq, keys) ->
+ keys.forEach(key -> {
+ LocalizationStatus status = LocalizationStatus.newInstance(key,
+ LocalizationState.PENDING);
+ statuses.add(status);
+ }));
+
+ synchronized (resourcesFailedToBeLocalized) {
+ statuses.addAll(resourcesFailedToBeLocalized);
+ }
+ return statuses;
+ }
+
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index f78bb6e..e215980 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -18,6 +18,11 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+import com.google.common.collect.Lists;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse;
+import org.apache.hadoop.yarn.api.records.LocalizationState;
+import org.apache.hadoop.yarn.api.records.LocalizationStatus;
import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import static org.junit.Assert.assertEquals;
@@ -2179,4 +2184,127 @@ public class TestContainerManager extends
BaseContainerManagerTest {
Assert.assertTrue(response.getFailedRequests().get(cId).getMessage()
.contains("Null resource visibility for local resource"));
}
+
+ @Test
+ public void testGetLocalizationStatuses() throws Exception {
+ containerManager.start();
+ ContainerId containerId = createContainerId(0, 0);
+ Token containerToken =
+ createContainerToken(containerId, DUMMY_RM_IDENTIFIER,
+ context.getNodeId(),
+ user, context.getContainerTokenSecretManager());
+
+ // localization resource
+ File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile_new");
+ PrintWriter fileWriter = new PrintWriter(scriptFile);
+ File file1 = new File(tmpDir, "file1.txt").getAbsoluteFile();
+
+ writeScriptFile(fileWriter, "Upgrade World!", file1, containerId, false);
+
+ ContainerLaunchContext containerLaunchContext =
+ prepareContainerLaunchContext(scriptFile, "dest_file1", false, 0);
+
+ StartContainerRequest request = StartContainerRequest.newInstance(
+ containerLaunchContext, containerToken);
+ List<StartContainerRequest> startRequest = new ArrayList<>();
+ startRequest.add(request);
+
+ // start container
+ StartContainersRequest requestList = StartContainersRequest.newInstance(
+ startRequest);
+ containerManager.startContainers(requestList);
+ Thread.sleep(5000);
+
+ // Get localization statuses
+ GetLocalizationStatusesRequest statusRequest =
+ GetLocalizationStatusesRequest.newInstance(
+ Lists.newArrayList(containerId));
+
+ GetLocalizationStatusesResponse statusResponse =
+ containerManager.getLocalizationStatuses(statusRequest);
+
+ Assert.assertEquals(1, statusResponse.getLocalizationStatuses()
+ .get(containerId).size());
+ LocalizationStatus status = statusResponse.getLocalizationStatuses()
+ .get(containerId).iterator().next();
+ Assert.assertEquals("resource key", "dest_file1",
+ status.getResourceKey());
+ Assert.assertEquals("resource status", LocalizationState.COMPLETED,
+ status.getLocalizationState());
+
+ Assert.assertEquals(0, statusResponse.getFailedRequests().size());
+
+ // stop containers
+ StopContainersRequest stopRequest =
+ StopContainersRequest.newInstance(Lists.newArrayList(containerId));
+ containerManager.stopContainers(stopRequest);
+ }
+
+ @Test
+ public void testGetLocalizationStatusesMultiContainers() throws Exception {
+ containerManager.start();
+ ContainerId container1 = createContainerId(0, 0);
+ ContainerId container2 = createContainerId(1, 0);
+
+ Token containerToken1 = createContainerToken(container1,
+ DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+ context.getContainerTokenSecretManager());
+ Token containerToken2 = createContainerToken(container2,
+ DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+ context.getContainerTokenSecretManager());
+
+ // localization resource
+ File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile_new");
+ PrintWriter fileWriter = new PrintWriter(scriptFile);
+ File file1 = new File(tmpDir, "file1.txt").getAbsoluteFile();
+
+ writeScriptFile(fileWriter, "Upgrade World!", file1, container1, false);
+
+ ContainerLaunchContext containerLaunchContext =
+ prepareContainerLaunchContext(scriptFile, "dest_file1", false, 0);
+
+ StartContainerRequest request1 = StartContainerRequest.newInstance(
+ containerLaunchContext, containerToken1);
+ StartContainerRequest request2 = StartContainerRequest.newInstance(
+ containerLaunchContext, containerToken2);
+
+ List<StartContainerRequest> startRequest = new ArrayList<>();
+ startRequest.add(request1);
+ startRequest.add(request2);
+
+ // start container
+ StartContainersRequest requestList = StartContainersRequest.newInstance(
+ startRequest);
+ containerManager.startContainers(requestList);
+ Thread.sleep(5000);
+
+ // Get localization statuses
+ GetLocalizationStatusesRequest statusRequest =
+ GetLocalizationStatusesRequest.newInstance(
+ Lists.newArrayList(container1, container2));
+
+ GetLocalizationStatusesResponse statusResponse =
+ containerManager.getLocalizationStatuses(statusRequest);
+ Assert.assertEquals(2, statusResponse.getLocalizationStatuses().size());
+
+ ContainerId[] containerIds = {container1, container2};
+ Arrays.stream(containerIds).forEach(cntnId -> {
+ List<LocalizationStatus> statuses = statusResponse
+ .getLocalizationStatuses().get(container1);
+ Assert.assertEquals(1, statuses.size());
+ LocalizationStatus status = statuses.get(0);
+ Assert.assertEquals("resource key", "dest_file1",
+ status.getResourceKey());
+ Assert.assertEquals("resource status", LocalizationState.COMPLETED,
+ status.getLocalizationState());
+ });
+
+ Assert.assertEquals(0, statusResponse.getFailedRequests().size());
+
+ // stop containers
+ StopContainersRequest stopRequest =
+ StopContainersRequest.newInstance(Lists.newArrayList(container1,
+ container2));
+ containerManager.stopContainers(stopRequest);
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceSet.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceSet.java
new file mode 100644
index 0000000..12d8c84
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceSet.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.LocalizationState;
+import org.apache.hadoop.yarn.api.records.LocalizationStatus;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests of {@link ResourceSet}.
+ */
+public class TestResourceSet {
+
+ @Test
+ public void testGetPendingLS() throws URISyntaxException {
+ ResourceSet resourceSet = new ResourceSet();
+ Map<String, LocalResource> resources = new HashMap<>();
+ resources.put("resource1",
+ LocalResource.newInstance(URL.fromPath(new Path("/tmp/file1.txt")),
+ LocalResourceType.FILE, LocalResourceVisibility.PRIVATE,
+ 0, System.currentTimeMillis()));
+ resourceSet.addResources(resources);
+
+ Assert.assertEquals("num statuses", 1,
+ resourceSet.getLocalizationStatuses().size());
+ LocalizationStatus status = resourceSet.getLocalizationStatuses()
+ .iterator().next();
+ Assert.assertEquals("status", LocalizationState.PENDING,
+ status.getLocalizationState());
+ }
+
+ @Test
+ public void testGetCompletedLS() throws URISyntaxException {
+ ResourceSet resourceSet = new ResourceSet();
+ Map<String, LocalResource> resources = new HashMap<>();
+ LocalResource resource1 = LocalResource.newInstance(
+ URL.fromPath(new Path("/tmp/file1.txt")),
+ LocalResourceType.FILE, LocalResourceVisibility.PRIVATE,
+ 0, System.currentTimeMillis());
+
+ resources.put("resource1", resource1);
+ resourceSet.addResources(resources);
+
+ LocalResourceRequest lrr = new LocalResourceRequest(resource1);
+ resourceSet.resourceLocalized(lrr, new Path("file1.txt"));
+
+ Assert.assertEquals("num statuses", 1,
+ resourceSet.getLocalizationStatuses().size());
+ LocalizationStatus status = resourceSet.getLocalizationStatuses()
+ .iterator().next();
+ Assert.assertEquals("status", LocalizationState.COMPLETED,
+ status.getLocalizationState());
+ }
+
+
+ @Test
+ public void testGetFailedLS() throws URISyntaxException {
+ ResourceSet resourceSet = new ResourceSet();
+ Map<String, LocalResource> resources = new HashMap<>();
+ LocalResource resource1 = LocalResource.newInstance(
+ URL.fromPath(new Path("/tmp/file1.txt")),
+ LocalResourceType.FILE, LocalResourceVisibility.PRIVATE,
+ 0, System.currentTimeMillis());
+
+ resources.put("resource1", resource1);
+ resourceSet.addResources(resources);
+
+ LocalResourceRequest lrr = new LocalResourceRequest(resource1);
+ resourceSet.resourceLocalizationFailed(lrr, "file does not exist");
+
+ Assert.assertEquals("num statuses", 1,
+ resourceSet.getLocalizationStatuses().size());
+ LocalizationStatus status = resourceSet.getLocalizationStatuses()
+ .iterator().next();
+ Assert.assertEquals("status", LocalizationState.FAILED,
+ status.getLocalizationState());
+ Assert.assertEquals("diagnostics", "file does not exist",
+ status.getDiagnostics());
+ }
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index f1b39bd..980f29b 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalizationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -272,4 +273,9 @@ public class MockContainer implements Container {
@Override public boolean isContainerInFinalStates() {
return false;
}
+
+ @Override
+ public List<LocalizationStatus> getLocalizationStatuses() {
+ return null;
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
index ee974e3..440d971 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
@@ -28,6 +28,8 @@ import java.util.Map;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
@@ -370,4 +372,11 @@ public class NodeManager implements
ContainerManagementProtocol {
throws YarnException, IOException {
return null;
}
+
+ @Override
+ public GetLocalizationStatusesResponse getLocalizationStatuses(
+ GetLocalizationStatusesRequest request) throws YarnException,
+ IOException {
+ return null;
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java
index 1acf658..50e5865d 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java
@@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
@@ -216,6 +218,13 @@ public class TestAMAuthorization {
throws YarnException, IOException {
return null;
}
+
+ @Override
+ public GetLocalizationStatusesResponse getLocalizationStatuses(
+ GetLocalizationStatusesRequest request) throws YarnException,
+ IOException {
+ return null;
+ }
}
public static class MockRMWithAMS extends MockRMWithCustomAMLauncher {
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
index 03ccd76..8fb1aa8 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
@@ -41,6 +41,8 @@ import
org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest;
+import
org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
@@ -212,6 +214,13 @@ public class TestApplicationMasterLauncher {
request) throws YarnException, IOException {
return null;
}
+
+ @Override
+ public GetLocalizationStatusesResponse getLocalizationStatuses(
+ GetLocalizationStatusesRequest request) throws YarnException,
+ IOException {
+ return null;
+ }
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]