Repository: hadoop Updated Branches: refs/heads/branch-2 ae35b0e14 -> 3d4536af8
YARN-2188. [YARN-1492] Client service for cache manager. (Chris Trezzo and Sangjin Lee via kasha) (cherry picked from commit 834b3b98e7ecc4bfe93c7dd1a9e928e9653cf138) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3d4536af Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3d4536af Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3d4536af Branch: refs/heads/branch-2 Commit: 3d4536af823caf908cfd158f4a98d09c11105bc1 Parents: ae35b0e Author: Karthik Kambatla <[email protected]> Authored: Tue Nov 25 16:21:29 2014 -0800 Committer: Karthik Kambatla <[email protected]> Committed: Tue Nov 25 16:22:14 2014 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 5 +- .../hadoop-yarn/hadoop-yarn-api/pom.xml | 1 + .../hadoop/yarn/api/ClientSCMProtocol.java | 90 ++++++ .../hadoop/yarn/api/ClientSCMProtocolPB.java | 28 ++ .../ReleaseSharedCacheResourceRequest.java | 67 +++++ .../ReleaseSharedCacheResourceResponse.java | 37 +++ .../UseSharedCacheResourceRequest.java | 70 +++++ .../UseSharedCacheResourceResponse.java | 55 ++++ .../hadoop/yarn/conf/YarnConfiguration.java | 12 + .../src/main/proto/client_SCM_protocol.proto | 30 ++ .../src/main/proto/yarn_service_protos.proto | 21 ++ .../client/ClientSCMProtocolPBClientImpl.java | 93 +++++++ .../service/ClientSCMProtocolPBServiceImpl.java | 78 ++++++ ...ReleaseSharedCacheResourceRequestPBImpl.java | 122 ++++++++ ...eleaseSharedCacheResourceResponsePBImpl.java | 53 ++++ .../pb/UseSharedCacheResourceRequestPBImpl.java | 120 ++++++++ .../UseSharedCacheResourceResponsePBImpl.java | 79 ++++++ .../src/main/resources/yarn-default.xml | 23 +- .../ClientProtocolService.java | 192 +++++++++++++ .../sharedcachemanager/SharedCacheManager.java | 7 + .../metrics/ClientSCMMetrics.java | 113 ++++++++ .../TestClientSCMProtocolService.java | 278 +++++++++++++++++++ 22 files changed, 1570 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 29f2797..1741d61 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -19,7 +19,10 @@ Release 2.7.0 - UNRELEASED (Chris Trezzo and Sangjin Lee via kasha) YARN-2236. [YARN-1492] Shared Cache uploader service on the Node - Manager. (Chris Trezzo and Sanjin Lee via kasha) + Manager. (Chris Trezzo and Sangjin Lee via kasha) + + YARN-2188. [YARN-1492] Client service for cache manager. + (Chris Trezzo and Sangjin Lee via kasha) IMPROVEMENTS http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml index 35983d3..1331b97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml @@ -96,6 +96,7 @@ <include>server/resourcemanager_administration_protocol.proto</include> <include>application_history_client.proto</include> <include>server/application_history_server.proto</include> + <include>client_SCM_protocol.proto</include> </includes> </source> <output>${project.build.directory}/generated-sources/java</output> http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocol.java new file mode 100644 index 0000000..d63fa11 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocol.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * <p> + * The protocol between clients and the <code>SharedCacheManager</code> to claim + * and release resources in the shared cache. + * </p> + */ +@Public +@Unstable +public interface ClientSCMProtocol { + /** + * <p> + * The interface used by clients to claim a resource with the + * <code>SharedCacheManager.</code> The client uses a checksum to identify the + * resource and an {@link ApplicationId} to identify which application will be + * using the resource. + * </p> + * + * <p> + * The <code>SharedCacheManager</code> responds with whether or not the + * resource exists in the cache. If the resource exists, a <code>Path</code> + * to the resource in the shared cache is returned. If the resource does not + * exist, the response is empty. + * </p> + * + * @param request request to claim a resource in the shared cache + * @return response indicating if the resource is already in the cache + * @throws YarnException + * @throws IOException + */ + public UseSharedCacheResourceResponse use( + UseSharedCacheResourceRequest request) throws YarnException, IOException; + + /** + * <p> + * The interface used by clients to release a resource with the + * <code>SharedCacheManager.</code> This method is called once an application + * is no longer using a claimed resource in the shared cache. The client uses + * a checksum to identify the resource and an {@link ApplicationId} to + * identify which application is releasing the resource. + * </p> + * + * <p> + * Note: This method is an optimization and the client is not required to call + * it for correctness. + * </p> + * + * <p> + * Currently the <code>SharedCacheManager</code> sends an empty response. + * </p> + * + * @param request request to release a resource in the shared cache + * @return (empty) response on releasing the resource + * @throws YarnException + * @throws IOException + */ + public ReleaseSharedCacheResourceResponse release( + ReleaseSharedCacheResourceRequest request) throws YarnException, IOException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocolPB.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocolPB.java new file mode 100644 index 0000000..b0a9fb5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocolPB.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api; + +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.yarn.proto.ClientSCMProtocol.ClientSCMProtocolService; + +@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.api.ClientSCMProtocolPB", + protocolVersion = 1) +public interface ClientSCMProtocolPB extends + ClientSCMProtocolService.BlockingInterface { + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceRequest.java new file mode 100644 index 0000000..a8d36b9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceRequest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +/** + * <p>The request from clients to release a resource in the shared cache.</p> + */ +@Public +@Unstable +public abstract class ReleaseSharedCacheResourceRequest { + + /** + * Get the <code>ApplicationId</code> of the resource to be released. + * + * @return <code>ApplicationId</code> + */ + @Public + @Unstable + public abstract ApplicationId getAppId(); + + /** + * Set the <code>ApplicationId</code> of the resource to be released. + * + * @param id <code>ApplicationId</code> + */ + @Public + @Unstable + public abstract void setAppId(ApplicationId id); + + /** + * Get the <code>key</code> of the resource to be released. + * + * @return <code>key</code> + */ + @Public + @Unstable + public abstract String getResourceKey(); + + /** + * Set the <code>key</code> of the resource to be released. + * + * @param key unique identifier for the resource + */ + @Public + @Unstable + public abstract void setResourceKey(String key); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceResponse.java new file mode 100644 index 0000000..c075e74 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceResponse.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * <p> + * The response to clients from the <code>SharedCacheManager</code> when + * releasing a resource in the shared cache. + * </p> + * + * <p> + * Currently, this is empty. + * </p> + */ +@Public +@Unstable +public abstract class ReleaseSharedCacheResourceResponse { +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceRequest.java new file mode 100644 index 0000000..bd42b7d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceRequest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +/** + * <p> + * The request from clients to the <code>SharedCacheManager</code> that claims a + * resource in the shared cache. + * </p> + */ +@Public +@Unstable +public abstract class UseSharedCacheResourceRequest { + + /** + * Get the <code>ApplicationId</code> of the resource to be used. + * + * @return <code>ApplicationId</code> + */ + @Public + @Unstable + public abstract ApplicationId getAppId(); + + /** + * Set the <code>ApplicationId</code> of the resource to be used. + * + * @param id <code>ApplicationId</code> + */ + @Public + @Unstable + public abstract void setAppId(ApplicationId id); + + /** + * Get the <code>key</code> of the resource to be used. + * + * @return <code>key</code> + */ + @Public + @Unstable + public abstract String getResourceKey(); + + /** + * Set the <code>key</code> of the resource to be used. + * + * @param key unique identifier for the resource + */ + @Public + @Unstable + public abstract void setResourceKey(String key); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceResponse.java new file mode 100644 index 0000000..87fb43b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceResponse.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * <p> + * The response from the SharedCacheManager to the client that indicates whether + * a requested resource exists in the cache. + * </p> + */ +@Public +@Unstable +public abstract class UseSharedCacheResourceResponse { + + /** + * Get the <code>Path</code> corresponding to the requested resource in the + * shared cache. + * + * @return String A <code>Path</code> if the resource exists in the shared + * cache, <code>null</code> otherwise + */ + @Public + @Unstable + public abstract String getPath(); + + /** + * Set the <code>Path</code> corresponding to a resource in the shared cache. + * + * @param p A <code>Path</code> corresponding to a resource in the shared + * cache + */ + @Public + @Unstable + public abstract void setPath(String p); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index fada0ea..d4d2fa9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1472,6 +1472,18 @@ public class YarnConfiguration extends Configuration { SHARED_CACHE_PREFIX + "uploader.server.thread-count"; public static final int DEFAULT_SCM_UPLOADER_SERVER_THREAD_COUNT = 50; + /** The address of the client interface in the SCM. */ + public static final String SCM_CLIENT_SERVER_ADDRESS = + SHARED_CACHE_PREFIX + "client-server.address"; + public static final int DEFAULT_SCM_CLIENT_SERVER_PORT = 8045; + public static final String DEFAULT_SCM_CLIENT_SERVER_ADDRESS = "0.0.0.0:" + + DEFAULT_SCM_CLIENT_SERVER_PORT; + + /** The number of threads used to handle shared cache manager requests. */ + public static final String SCM_CLIENT_SERVER_THREAD_COUNT = + SHARED_CACHE_PREFIX + "client-server.thread-count"; + public static final int DEFAULT_SCM_CLIENT_SERVER_THREAD_COUNT = 50; + /** the checksum algorithm implementation **/ public static final String SHARED_CACHE_CHECKSUM_ALGO_IMPL = SHARED_CACHE_PREFIX + "checksum.algo.impl"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_SCM_protocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_SCM_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_SCM_protocol.proto new file mode 100644 index 0000000..fbc3c42 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_SCM_protocol.proto @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "ClientSCMProtocol"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +import "yarn_service_protos.proto"; + +service ClientSCMProtocolService { + rpc use (UseSharedCacheResourceRequestProto) returns (UseSharedCacheResourceResponseProto); + rpc release (ReleaseSharedCacheResourceRequestProto) returns (ReleaseSharedCacheResourceResponseProto); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 1bde69a..10f5b9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -307,6 +307,27 @@ message GetContainersResponseProto { } ////////////////////////////////////////////////////// +/////// client_SCM_Protocol ////////////////////////// +////////////////////////////////////////////////////// + +message UseSharedCacheResourceRequestProto { + optional ApplicationIdProto applicationId = 1; + optional string resourceKey = 2; +} + +message UseSharedCacheResourceResponseProto { + optional string path = 1; +} + +message ReleaseSharedCacheResourceRequestProto { + optional ApplicationIdProto applicationId = 1; + optional string resourceKey = 2; +} + +message ReleaseSharedCacheResourceResponseProto { +} + +////////////////////////////////////////////////////// // reservation_protocol ////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientSCMProtocolPBClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientSCMProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientSCMProtocolPBClientImpl.java new file mode 100644 index 0000000..79bfaca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientSCMProtocolPBClientImpl.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.impl.pb.client; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.api.ClientSCMProtocol; +import org.apache.hadoop.yarn.api.ClientSCMProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceResponsePBImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProto; + +import com.google.protobuf.ServiceException; + +public class ClientSCMProtocolPBClientImpl implements ClientSCMProtocol, + Closeable { + + private ClientSCMProtocolPB proxy; + + public ClientSCMProtocolPBClientImpl(long clientVersion, + InetSocketAddress addr, Configuration conf) throws IOException { + RPC.setProtocolEngine(conf, ClientSCMProtocolPB.class, + ProtobufRpcEngine.class); + proxy = RPC.getProxy(ClientSCMProtocolPB.class, clientVersion, addr, conf); + } + + @Override + public void close() { + if (this.proxy != null) { + RPC.stopProxy(this.proxy); + this.proxy = null; + } + } + + @Override + public UseSharedCacheResourceResponse use( + UseSharedCacheResourceRequest request) throws YarnException, IOException { + UseSharedCacheResourceRequestProto requestProto = + ((UseSharedCacheResourceRequestPBImpl) request).getProto(); + try { + return new UseSharedCacheResourceResponsePBImpl(proxy.use(null, + requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public ReleaseSharedCacheResourceResponse release( + ReleaseSharedCacheResourceRequest request) throws YarnException, + IOException { + ReleaseSharedCacheResourceRequestProto requestProto = + ((ReleaseSharedCacheResourceRequestPBImpl) request).getProto(); + try { + return new ReleaseSharedCacheResourceResponsePBImpl(proxy.release(null, + requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientSCMProtocolPBServiceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientSCMProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientSCMProtocolPBServiceImpl.java new file mode 100644 index 0000000..65b3581 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientSCMProtocolPBServiceImpl.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.impl.pb.service; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.ClientSCMProtocol; +import org.apache.hadoop.yarn.api.ClientSCMProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceResponsePBImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceResponseProto; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +public class ClientSCMProtocolPBServiceImpl implements ClientSCMProtocolPB { + + private ClientSCMProtocol real; + + public ClientSCMProtocolPBServiceImpl(ClientSCMProtocol impl) { + this.real = impl; + } + + @Override + public UseSharedCacheResourceResponseProto use(RpcController controller, + UseSharedCacheResourceRequestProto proto) throws ServiceException { + UseSharedCacheResourceRequestPBImpl request = + new UseSharedCacheResourceRequestPBImpl(proto); + try { + UseSharedCacheResourceResponse response = real.use(request); + return ((UseSharedCacheResourceResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public ReleaseSharedCacheResourceResponseProto release( + RpcController controller, ReleaseSharedCacheResourceRequestProto proto) + throws ServiceException { + ReleaseSharedCacheResourceRequestPBImpl request = + new ReleaseSharedCacheResourceRequestPBImpl(proto); + try { + ReleaseSharedCacheResourceResponse response = real.release(request); + return ((ReleaseSharedCacheResourceResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceRequestPBImpl.java new file mode 100644 index 0000000..d16ef78 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceRequestPBImpl.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProtoOrBuilder; + +public class ReleaseSharedCacheResourceRequestPBImpl extends + ReleaseSharedCacheResourceRequest { + ReleaseSharedCacheResourceRequestProto proto = + ReleaseSharedCacheResourceRequestProto.getDefaultInstance(); + ReleaseSharedCacheResourceRequestProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationId applicationId = null; + + public ReleaseSharedCacheResourceRequestPBImpl() { + builder = ReleaseSharedCacheResourceRequestProto.newBuilder(); + } + + public ReleaseSharedCacheResourceRequestPBImpl( + ReleaseSharedCacheResourceRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public ReleaseSharedCacheResourceRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public ApplicationId getAppId() { + ReleaseSharedCacheResourceRequestProtoOrBuilder p = + viaProto ? proto : builder; + if (this.applicationId != null) { + return this.applicationId; + } + if (!p.hasApplicationId()) { + return null; + } + this.applicationId = convertFromProtoFormat(p.getApplicationId()); + return this.applicationId; + } + + @Override + public void setAppId(ApplicationId id) { + maybeInitBuilder(); + if (id == null) + builder.clearApplicationId(); + this.applicationId = id; + } + + @Override + public String getResourceKey() { + ReleaseSharedCacheResourceRequestProtoOrBuilder p = + viaProto ? proto : builder; + return (p.hasResourceKey()) ? p.getResourceKey() : null; + } + + @Override + public void setResourceKey(String key) { + maybeInitBuilder(); + if (key == null) { + builder.clearResourceKey(); + return; + } + builder.setResourceKey(key); + } + + private void mergeLocalToBuilder() { + if (applicationId != null) { + builder.setApplicationId(convertToProtoFormat(this.applicationId)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ReleaseSharedCacheResourceRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceResponsePBImpl.java new file mode 100644 index 0000000..559f2c8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceResponsePBImpl.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceResponseProto; + +public class ReleaseSharedCacheResourceResponsePBImpl extends + ReleaseSharedCacheResourceResponse { + ReleaseSharedCacheResourceResponseProto proto = + ReleaseSharedCacheResourceResponseProto.getDefaultInstance(); + ReleaseSharedCacheResourceResponseProto.Builder builder = null; + boolean viaProto = false; + + public ReleaseSharedCacheResourceResponsePBImpl() { + builder = ReleaseSharedCacheResourceResponseProto.newBuilder(); + } + + public ReleaseSharedCacheResourceResponsePBImpl( + ReleaseSharedCacheResourceResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public ReleaseSharedCacheResourceResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ReleaseSharedCacheResourceResponseProto.newBuilder(proto); + } + viaProto = false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceRequestPBImpl.java new file mode 100644 index 0000000..2a134b3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceRequestPBImpl.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProtoOrBuilder; + +public class UseSharedCacheResourceRequestPBImpl extends + UseSharedCacheResourceRequest { + UseSharedCacheResourceRequestProto proto = UseSharedCacheResourceRequestProto + .getDefaultInstance(); + UseSharedCacheResourceRequestProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationId applicationId = null; + + public UseSharedCacheResourceRequestPBImpl() { + builder = UseSharedCacheResourceRequestProto.newBuilder(); + } + + public UseSharedCacheResourceRequestPBImpl( + UseSharedCacheResourceRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public UseSharedCacheResourceRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public ApplicationId getAppId() { + UseSharedCacheResourceRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.applicationId != null) { + return this.applicationId; + } + if (!p.hasApplicationId()) { + return null; + } + this.applicationId = convertFromProtoFormat(p.getApplicationId()); + return this.applicationId; + } + + @Override + public void setAppId(ApplicationId id) { + maybeInitBuilder(); + if (id == null) + builder.clearApplicationId(); + this.applicationId = id; + } + + @Override + public String getResourceKey() { + UseSharedCacheResourceRequestProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasResourceKey()) ? p.getResourceKey() : null; + } + + @Override + public void setResourceKey(String key) { + maybeInitBuilder(); + if (key == null) { + builder.clearResourceKey(); + return; + } + builder.setResourceKey(key); + } + + private void mergeLocalToBuilder() { + if (applicationId != null) { + builder.setApplicationId(convertToProtoFormat(this.applicationId)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = UseSharedCacheResourceRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceResponsePBImpl.java new file mode 100644 index 0000000..0dd44c9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceResponsePBImpl.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceResponseProtoOrBuilder; + +public class UseSharedCacheResourceResponsePBImpl extends + UseSharedCacheResourceResponse { + UseSharedCacheResourceResponseProto proto = + UseSharedCacheResourceResponseProto + .getDefaultInstance(); + UseSharedCacheResourceResponseProto.Builder builder = null; + boolean viaProto = false; + + public UseSharedCacheResourceResponsePBImpl() { + builder = UseSharedCacheResourceResponseProto.newBuilder(); + } + + public UseSharedCacheResourceResponsePBImpl( + UseSharedCacheResourceResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public UseSharedCacheResourceResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public String getPath() { + UseSharedCacheResourceResponseProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasPath()) ? p.getPath() : null; + } + + @Override + public void setPath(String path) { + maybeInitBuilder(); + if (path == null) { + builder.clearPath(); + return; + } + builder.setPath(path); + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = UseSharedCacheResourceResponseProto.newBuilder(proto); + } + viaProto = false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 73aa816..b7bdfc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1458,19 +1458,36 @@ </property> <property> - <description>The algorithm used to compute checksums of files (SHA-256 by default)</description> + <description>The address of the client interface in the SCM + (shared cache manager)</description> + <name>yarn.sharedcache.client-server.address</name> + <value>0.0.0.0:8045</value> + </property> + + <property> + <description>The number of threads used to handle shared cache manager + requests from clients (50 by default)</description> + <name>yarn.sharedcache.client-server.thread-count</name> + <value>50</value> + </property> + + <property> + <description>The algorithm used to compute checksums of files (SHA-256 by + default)</description> <name>yarn.sharedcache.checksum.algo.impl</name> <value>org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl</value> </property> <property> - <description>The replication factor for the node manager uploader for the shared cache (10 by default)</description> + <description>The replication factor for the node manager uploader for the + shared cache (10 by default)</description> <name>yarn.sharedcache.nm.uploader.replication.factor</name> <value>10</value> </property> <property> - <description>The number of threads used to upload files from a node manager instance (20 by default)</description> + <description>The number of threads used to upload files from a node manager + instance (20 by default)</description> <name>yarn.sharedcache.nm.uploader.thread-count</name> <value>20</value> </property> http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/ClientProtocolService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/ClientProtocolService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/ClientProtocolService.java new file mode 100644 index 0000000..bd13573 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/ClientProtocolService.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.ClientSCMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.ClientSCMMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SharedCacheResourceReference; + +/** + * This service handles all rpc calls from the client to the shared cache + * manager. + */ +@Private +@Evolving +public class ClientProtocolService extends AbstractService implements + ClientSCMProtocol { + + private static final Log LOG = LogFactory.getLog(ClientProtocolService.class); + + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + private Server server; + InetSocketAddress clientBindAddress; + private final SCMStore store; + private int cacheDepth; + private String cacheRoot; + private ClientSCMMetrics metrics; + + public ClientProtocolService(SCMStore store) { + super(ClientProtocolService.class.getName()); + this.store = store; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.clientBindAddress = getBindAddress(conf); + + this.cacheDepth = SharedCacheUtil.getCacheDepth(conf); + + this.cacheRoot = + conf.get(YarnConfiguration.SHARED_CACHE_ROOT, + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); + + super.serviceInit(conf); + } + + InetSocketAddress getBindAddress(Configuration conf) { + return conf.getSocketAddr(YarnConfiguration.SCM_CLIENT_SERVER_ADDRESS, + YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_ADDRESS, + YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_PORT); + } + + @Override + protected void serviceStart() throws Exception { + Configuration conf = getConfig(); + this.metrics = ClientSCMMetrics.initSingleton(conf); + + YarnRPC rpc = YarnRPC.create(conf); + this.server = + rpc.getServer(ClientSCMProtocol.class, this, + clientBindAddress, + conf, null, // Secret manager null for now (security not supported) + conf.getInt(YarnConfiguration.SCM_CLIENT_SERVER_THREAD_COUNT, + YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_THREAD_COUNT)); + + // TODO (YARN-2774): Enable service authorization + + this.server.start(); + clientBindAddress = + conf.updateConnectAddr(YarnConfiguration.SCM_CLIENT_SERVER_ADDRESS, + server.getListenerAddress()); + + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (this.server != null) { + this.server.stop(); + } + + super.serviceStop(); + } + + @Override + public UseSharedCacheResourceResponse use( + UseSharedCacheResourceRequest request) throws YarnException, + IOException { + + UseSharedCacheResourceResponse response = + recordFactory.newRecordInstance(UseSharedCacheResourceResponse.class); + + UserGroupInformation callerUGI; + try { + callerUGI = UserGroupInformation.getCurrentUser(); + } catch (IOException ie) { + LOG.info("Error getting UGI ", ie); + throw RPCUtil.getRemoteException(ie); + } + + String fileName = + this.store.addResourceReference(request.getResourceKey(), + new SharedCacheResourceReference(request.getAppId(), + callerUGI.getShortUserName())); + + if (fileName != null) { + response + .setPath(getCacheEntryFilePath(request.getResourceKey(), fileName)); + this.metrics.incCacheHitCount(); + } else { + this.metrics.incCacheMissCount(); + } + + return response; + } + + @Override + public ReleaseSharedCacheResourceResponse release( + ReleaseSharedCacheResourceRequest request) throws YarnException, + IOException { + + ReleaseSharedCacheResourceResponse response = + recordFactory + .newRecordInstance(ReleaseSharedCacheResourceResponse.class); + + UserGroupInformation callerUGI; + try { + callerUGI = UserGroupInformation.getCurrentUser(); + } catch (IOException ie) { + LOG.info("Error getting UGI ", ie); + throw RPCUtil.getRemoteException(ie); + } + + boolean removed = + this.store.removeResourceReference( + request.getResourceKey(), + new SharedCacheResourceReference(request.getAppId(), callerUGI + .getShortUserName()), true); + + if (removed) { + this.metrics.incCacheRelease(); + } + + return response; + } + + private String getCacheEntryFilePath(String checksum, String filename) { + return SharedCacheUtil.getCacheEntryPath(this.cacheDepth, + this.cacheRoot, checksum) + Path.SEPARATOR_CHAR + filename; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java index ab50727..c54e470 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java @@ -71,6 +71,9 @@ public class SharedCacheManager extends CompositeService { createNMCacheUploaderSCMProtocolService(store); addService(nms); + ClientProtocolService cps = createClientProtocolService(store); + addService(cps); + // init metrics DefaultMetricsSystem.initialize("SharedCacheManager"); JvmMetrics.initSingleton("SharedCacheManager", null); @@ -106,6 +109,10 @@ public class SharedCacheManager extends CompositeService { return new SharedCacheUploaderService(store); } + private ClientProtocolService createClientProtocolService(SCMStore store) { + return new ClientProtocolService(store); + } + @Override protected void serviceStop() throws Exception { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/ClientSCMMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/ClientSCMMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/ClientSCMMetrics.java new file mode 100644 index 0000000..3ae88e0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/ClientSCMMetrics.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.sharedcachemanager.metrics; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; + +/** + * This class is for maintaining client requests metrics + * and publishing them through the metrics interfaces. + */ +@Private +@Unstable +@Metrics(about="Client SCM metrics", context="yarn") +public class ClientSCMMetrics { + + private static final Log LOG = LogFactory.getLog(ClientSCMMetrics.class); + final MetricsRegistry registry; + + ClientSCMMetrics() { + registry = new MetricsRegistry("clientRequests"); + LOG.debug("Initialized " + registry); + } + + enum Singleton { + INSTANCE; + + ClientSCMMetrics impl; + + synchronized ClientSCMMetrics init(Configuration conf) { + if (impl == null) { + impl = create(); + } + return impl; + } + } + + public static ClientSCMMetrics initSingleton(Configuration conf) { + return Singleton.INSTANCE.init(conf); + } + + public static ClientSCMMetrics getInstance() { + ClientSCMMetrics topMetrics = Singleton.INSTANCE.impl; + if (topMetrics == null) { + throw new IllegalStateException( + "The ClientSCMMetrics singleton instance is not initialized." + + " Have you called init first?"); + } + return topMetrics; + } + + static ClientSCMMetrics create() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + + ClientSCMMetrics metrics = new ClientSCMMetrics(); + ms.register("clientRequests", null, metrics); + return metrics; + } + + @Metric("Number of cache hits") MutableCounterLong cacheHits; + @Metric("Number of cache misses") MutableCounterLong cacheMisses; + @Metric("Number of cache releases") MutableCounterLong cacheReleases; + + /** + * One cache hit event + */ + public void incCacheHitCount() { + cacheHits.incr(); + } + + /** + * One cache miss event + */ + public void incCacheMissCount() { + cacheMisses.incr(); + } + + /** + * One cache release event + */ + public void incCacheRelease() { + cacheReleases.incr(); + } + + public long getCacheHits() { return cacheHits.value(); } + public long getCacheMisses() { return cacheMisses.value(); } + public long getCacheReleases() { return cacheReleases.value(); } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d4536af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestClientSCMProtocolService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestClientSCMProtocolService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestClientSCMProtocolService.java new file mode 100644 index 0000000..68f9851 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestClientSCMProtocolService.java @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ClientSCMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.ClientSCMMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SharedCacheResourceReference; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + + +/** + * Basic unit tests for the Client to SCM Protocol Service. + */ +public class TestClientSCMProtocolService { + private static File testDir = null; + + @BeforeClass + public static void setupTestDirs() throws IOException { + testDir = new File("target", + TestSharedCacheUploaderService.class.getCanonicalName()); + testDir.delete(); + testDir.mkdirs(); + testDir = testDir.getAbsoluteFile(); + } + + @AfterClass + public static void cleanupTestDirs() throws IOException { + if (testDir != null) { + testDir.delete(); + } + } + + + private ClientProtocolService service; + private ClientSCMProtocol clientSCMProxy; + private SCMStore store; + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + @Before + public void startUp() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.SCM_STORE_CLASS, + InMemorySCMStore.class.getName()); + conf.set(YarnConfiguration.SHARED_CACHE_ROOT, testDir.getPath()); + AppChecker appChecker = mock(AppChecker.class); + store = new InMemorySCMStore(appChecker); + store.init(conf); + store.start(); + + service = new ClientProtocolService(store); + service.init(conf); + service.start(); + + YarnRPC rpc = YarnRPC.create(new Configuration()); + + InetSocketAddress scmAddress = + conf.getSocketAddr(YarnConfiguration.SCM_CLIENT_SERVER_ADDRESS, + YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_ADDRESS, + YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_PORT); + + clientSCMProxy = + (ClientSCMProtocol) rpc.getProxy(ClientSCMProtocol.class, scmAddress, + conf); + } + + @After + public void cleanUp() { + if (store != null) { + store.stop(); + store = null; + } + + if (service != null) { + service.stop(); + service = null; + } + + if (clientSCMProxy != null) { + RPC.stopProxy(clientSCMProxy); + clientSCMProxy = null; + } + } + + @Test + public void testUse_MissingEntry() throws Exception { + long misses = ClientSCMMetrics.getInstance().getCacheMisses(); + UseSharedCacheResourceRequest request = + recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class); + request.setResourceKey("key1"); + request.setAppId(createAppId(1, 1L)); + assertNull(clientSCMProxy.use(request).getPath()); + assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics + .getInstance().getCacheMisses() - misses); + } + + @Test + public void testUse_ExistingEntry_NoAppIds() throws Exception { + // Pre-populate the SCM with one cache entry + store.addResource("key1", "foo.jar"); + + long hits = ClientSCMMetrics.getInstance().getCacheHits(); + + UseSharedCacheResourceRequest request = + recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class); + request.setResourceKey("key1"); + request.setAppId(createAppId(2, 2L)); + // Expecting default depth of 3 and under the shared cache root dir + String expectedPath = testDir.getAbsolutePath() + "/k/e/y/key1/foo.jar"; + assertEquals(expectedPath, clientSCMProxy.use(request).getPath()); + assertEquals(1, store.getResourceReferences("key1").size()); + assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics + .getInstance().getCacheHits() - hits); + + } + + @Test + public void testUse_ExistingEntry_OneId() throws Exception { + // Pre-populate the SCM with one cache entry + store.addResource("key1", "foo.jar"); + store.addResourceReference("key1", + new SharedCacheResourceReference(createAppId(1, 1L), "user")); + assertEquals(1, store.getResourceReferences("key1").size()); + long hits = ClientSCMMetrics.getInstance().getCacheHits(); + + // Add a new distinct appId + UseSharedCacheResourceRequest request = + recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class); + request.setResourceKey("key1"); + request.setAppId(createAppId(2, 2L)); + + // Expecting default depth of 3 under the shared cache root dir + String expectedPath = testDir.getAbsolutePath() + "/k/e/y/key1/foo.jar"; + assertEquals(expectedPath, clientSCMProxy.use(request).getPath()); + assertEquals(2, store.getResourceReferences("key1").size()); + assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics + .getInstance().getCacheHits() - hits); + } + + @Test + public void testUse_ExistingEntry_DupId() throws Exception { + // Pre-populate the SCM with one cache entry + store.addResource("key1", "foo.jar"); + UserGroupInformation testUGI = UserGroupInformation.getCurrentUser(); + store.addResourceReference("key1", + new SharedCacheResourceReference(createAppId(1, 1L), + testUGI.getShortUserName())); + assertEquals(1, store.getResourceReferences("key1").size()); + + long hits = ClientSCMMetrics.getInstance().getCacheHits(); + + // Add a new duplicate appId + UseSharedCacheResourceRequest request = + recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class); + request.setResourceKey("key1"); + request.setAppId(createAppId(1, 1L)); + + // Expecting default depth of 3 under the shared cache root dir + String expectedPath = testDir.getAbsolutePath() + "/k/e/y/key1/foo.jar"; + assertEquals(expectedPath, clientSCMProxy.use(request).getPath()); + assertEquals(1, store.getResourceReferences("key1").size()); + + assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics + .getInstance().getCacheHits() - hits); + } + + @Test + public void testRelease_ExistingEntry_NonExistantAppId() throws Exception { + // Pre-populate the SCM with one cache entry + store.addResource("key1", "foo.jar"); + store.addResourceReference("key1", + new SharedCacheResourceReference(createAppId(1, 1L), "user")); + assertEquals(1, store.getResourceReferences("key1").size()); + + long releases = ClientSCMMetrics.getInstance().getCacheReleases(); + + ReleaseSharedCacheResourceRequest request = + recordFactory + .newRecordInstance(ReleaseSharedCacheResourceRequest.class); + request.setResourceKey("key1"); + request.setAppId(createAppId(2, 2L)); + clientSCMProxy.release(request); + assertEquals(1, store.getResourceReferences("key1").size()); + + assertEquals( + "Client SCM metrics were updated when a release did not happen", 0, + ClientSCMMetrics.getInstance().getCacheReleases() - releases); + + } + + @Test + public void testRelease_ExistingEntry_WithAppId() throws Exception { + // Pre-populate the SCM with one cache entry + store.addResource("key1", "foo.jar"); + UserGroupInformation testUGI = UserGroupInformation.getCurrentUser(); + store.addResourceReference("key1", + new SharedCacheResourceReference(createAppId(1, 1L), + testUGI.getShortUserName())); + assertEquals(1, store.getResourceReferences("key1").size()); + + long releases = ClientSCMMetrics.getInstance().getCacheReleases(); + + ReleaseSharedCacheResourceRequest request = + recordFactory + .newRecordInstance(ReleaseSharedCacheResourceRequest.class); + request.setResourceKey("key1"); + request.setAppId(createAppId(1, 1L)); + clientSCMProxy.release(request); + assertEquals(0, store.getResourceReferences("key1").size()); + + assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics + .getInstance().getCacheReleases() - releases); + + } + + @Test + public void testRelease_MissingEntry() throws Exception { + + long releases = ClientSCMMetrics.getInstance().getCacheReleases(); + + ReleaseSharedCacheResourceRequest request = + recordFactory + .newRecordInstance(ReleaseSharedCacheResourceRequest.class); + request.setResourceKey("key2"); + request.setAppId(createAppId(2, 2L)); + clientSCMProxy.release(request); + assertNotNull(store.getResourceReferences("key2")); + assertEquals(0, store.getResourceReferences("key2").size()); + assertEquals( + "Client SCM metrics were updated when a release did not happen.", 0, + ClientSCMMetrics.getInstance().getCacheReleases() - releases); + } + + private ApplicationId createAppId(int id, long timestamp) { + return ApplicationId.newInstance(timestamp, id); + } +}
