Author: jing9
Date: Wed Jul 24 07:48:36 2013
New Revision: 1506426
URL: http://svn.apache.org/r1506426
Log:
HADOOP-9762. RetryCache utility for implementing RPC retries. Contributed by
Suresh Srinivas.
Added:
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1506426&r1=1506425&r2=1506426&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Wed Jul
24 07:48:36 2013
@@ -372,6 +372,9 @@ Release 2.1.0-beta - 2013-07-02
HADOOP-9763. Extends LightWeightGSet to support eviction of expired
elements. (Tsz Wo (Nicholas) SZE via jing9)
+ HADOOP-9762. RetryCache utility for implementing RPC retries.
+ (Suresh Srinivas via jing9)
+
IMPROVEMENTS
HADOOP-9164. Print paths of loaded native libraries in
Added:
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java?rev=1506426&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java
(added)
+++
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java
Wed Jul 24 07:48:36 2013
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.util.LightWeightCache;
+import org.apache.hadoop.util.LightWeightGSet;
+import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Maintains a cache of non-idempotent requests that have been successfully
+ * processed by the RPC server implementation, to handle the retries. A request
+ * is uniquely identified by the unique client ID + call ID of the RPC request.
+ * On receiving retried request, an entry will be found in the
+ * {@link RetryCache} and the previous response is sent back to the request.
+ * <p>
+ * To look an implementation using this cache, see HDFS FSNamesystem class.
+ */
[email protected]
+public class RetryCache {
+ public static final Log LOG = LogFactory.getLog(RetryCache.class);
+ /**
+ * CacheEntry is tracked using unique client ID and callId of the RPC request
+ */
+ public static class CacheEntry implements LightWeightCache.Entry {
+ /**
+ * Processing state of the requests
+ */
+ private static byte INPROGRESS = 0;
+ private static byte SUCCESS = 1;
+ private static byte FAILED = 2;
+
+ private volatile byte state = INPROGRESS;
+
+ // Store uuid as two long for better memory utilization
+ private final long clientIdMsb; // Most signficant bytes
+ private final long clientIdLsb; // Least significant bytes
+
+ private final int callId;
+ private final long expirationTime;
+ private LightWeightGSet.LinkedElement next;
+
+ CacheEntry(byte[] clientId, int callId, long expirationTime) {
+ Preconditions.checkArgument(clientId.length == 16, "Invalid clientId");
+ // Conver UUID bytes to two longs
+ long tmp = 0;
+ for (int i=0; i<8; i++) {
+ tmp = (tmp << 8) | (clientId[i] & 0xff);
+ }
+ clientIdMsb = tmp;
+ tmp = 0;
+ for (int i=8; i<16; i++) {
+ tmp = (tmp << 8) | (clientId[i] & 0xff);
+ }
+ clientIdLsb = tmp;
+ this.callId = callId;
+ this.expirationTime = expirationTime;
+ }
+
+ private static int hashCode(long value) {
+ return (int)(value ^ (value >>> 32));
+ }
+
+ @Override
+ public int hashCode() {
+ return (hashCode(clientIdMsb) * 31 + hashCode(clientIdLsb)) * 31 +
callId;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof CacheEntry)) {
+ return false;
+ }
+ CacheEntry other = (CacheEntry) obj;
+ return callId == other.callId && clientIdMsb == other.clientIdMsb
+ && clientIdLsb == other.clientIdLsb;
+ }
+
+ @Override
+ public void setNext(LinkedElement next) {
+ this.next = next;
+ }
+
+ @Override
+ public LinkedElement getNext() {
+ return next;
+ }
+
+ synchronized void completed(boolean success) {
+ state = success ? SUCCESS : FAILED;
+ this.notifyAll();
+ }
+
+ public boolean isSuccess() {
+ return state == SUCCESS;
+ }
+
+ @Override
+ public void setExpirationTime(long timeNano) {
+ // expiration time does not change
+ }
+
+ @Override
+ public long getExpirationTime() {
+ return expirationTime;
+ }
+ }
+
+ /**
+ * CacheEntry with payload that tracks the previous response or parts of
+ * previous response to be used for generating response for retried requests.
+ */
+ public static class CacheEntryWithPayload extends CacheEntry {
+ private Object payload;
+
+ CacheEntryWithPayload(byte[] clientId, int callId, Object payload,
+ long expirationTime) {
+ super(clientId, callId, expirationTime);
+ this.payload = payload;
+ }
+
+ /** Override equals to avoid findbugs warnings */
+ @Override
+ public boolean equals(Object obj) {
+ return super.equals(obj);
+ }
+
+ /** Override hashcode to avoid findbugs warnings */
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+
+ public Object getPayload() {
+ return payload;
+ }
+ }
+
+ private final LightWeightGSet<CacheEntry, CacheEntry> set;
+ private final long expirationTime;
+
+ /**
+ * Constructor
+ * @param cacheName name to identify the cache by
+ * @param percentage percentage of total java heap space used by this cache
+ * @param expirationTime time for an entry to expire in nanoseconds
+ */
+ public RetryCache(String cacheName, double percentage, long expirationTime) {
+ int capacity = LightWeightGSet.computeCapacity(percentage, cacheName);
+ capacity = capacity > 16 ? capacity : 16;
+ this.set = new LightWeightCache<CacheEntry, CacheEntry>(capacity, capacity,
+ expirationTime, 0);
+ this.expirationTime = expirationTime;
+ }
+
+ private static boolean skipRetryCache() {
+ // Do not track non RPC invocation or RPC requests with
+ // invalid callId or clientId in retry cache
+ return !Server.isRpcInvocation() || Server.getCallId() < 0
+ || Arrays.equals(Server.getClientId(), RpcConstants.DUMMY_CLIENT_ID);
+ }
+
+ /**
+ * This method handles the following conditions:
+ * <ul>
+ * <li>If retry is not to be processed, return null</li>
+ * <li>If there is no cache entry, add a new entry {@code newEntry} and
return
+ * it.</li>
+ * <li>If there is an existing entry, wait for its completion. If the
+ * completion state is {@link CacheEntry#FAILED}, the expectation is that the
+ * thread that waited for completion, retries the request. the
+ * {@link CacheEntry} state is set to {@link CacheEntry#INPROGRESS} again.
+ * <li>If the completion state is {@link CacheEntry#SUCCESS}, the entry is
+ * returned so that the thread that waits for it can can return previous
+ * response.</li>
+ * <ul>
+ *
+ * @return {@link CacheEntry}.
+ */
+ private CacheEntry waitForCompletion(CacheEntry newEntry) {
+ CacheEntry mapEntry = null;
+ synchronized (this) {
+ mapEntry = set.get(newEntry);
+ // If an entry in the cache does not exist, add a new one
+ if (mapEntry == null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Adding Rpc request clientId "
+ + newEntry.clientIdMsb + newEntry.clientIdLsb + " callId "
+ + newEntry.callId + " to retryCache");
+ }
+ set.put(newEntry);
+ return newEntry;
+ }
+ }
+ // Entry already exists in cache. Wait for completion and return its state
+ Preconditions.checkNotNull(mapEntry,
+ "Entry from the cache should not be null");
+ // Wait for in progress request to complete
+ synchronized (mapEntry) {
+ while (mapEntry.state == CacheEntry.INPROGRESS) {
+ try {
+ mapEntry.wait();
+ } catch (InterruptedException ie) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ }
+ }
+ // Previous request has failed, the expectation is is that it will be
+ // retried again.
+ if (mapEntry.state != CacheEntry.SUCCESS) {
+ mapEntry.state = CacheEntry.INPROGRESS;
+ }
+ }
+ return mapEntry;
+ }
+
+ private static CacheEntry newEntry(long expirationTime) {
+ return new CacheEntry(Server.getClientId(), Server.getCallId(),
+ expirationTime);
+ }
+
+ private static CacheEntryWithPayload newEntry(Object payload,
+ long expirationTime) {
+ return new CacheEntryWithPayload(Server.getClientId(), Server.getCallId(),
+ payload, expirationTime);
+ }
+
+ /** Static method that provides null check for retryCache */
+ public static CacheEntry waitForCompletion(RetryCache cache) {
+ if (skipRetryCache()) {
+ return null;
+ }
+ return cache != null ? cache
+ .waitForCompletion(newEntry(cache.expirationTime)) : null;
+ }
+
+ /** Static method that provides null check for retryCache */
+ public static CacheEntryWithPayload waitForCompletion(RetryCache cache,
+ Object payload) {
+ if (skipRetryCache()) {
+ return null;
+ }
+ return (CacheEntryWithPayload) (cache != null ? cache
+ .waitForCompletion(newEntry(payload, cache.expirationTime)) : null);
+ }
+
+ public static void setState(CacheEntry e, boolean success) {
+ if (e == null) {
+ return;
+ }
+ e.completed(success);
+ }
+
+ public static void setState(CacheEntryWithPayload e, boolean success,
+ Object payload) {
+ if (e == null) {
+ return;
+ }
+ e.payload = payload;
+ e.completed(success);
+ }
+
+ public static void clear(RetryCache cache) {
+ if (cache != null) {
+ cache.set.clear();
+ }
+ }
+}
\ No newline at end of file
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1506426&r1=1506425&r2=1506426&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
(original)
+++
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
Wed Jul 24 07:48:36 2013
@@ -271,6 +271,12 @@ public abstract class Server {
*/
private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
+ /** Get the current call */
+ @VisibleForTesting
+ public static ThreadLocal<Call> getCurCall() {
+ return CurCall;
+ }
+
/**
* Returns the currently active RPC call's sequential ID number. A negative
* call ID indicates an invalid value, such as if there is no currently
active
@@ -278,7 +284,7 @@ public abstract class Server {
*
* @return int sequential ID number of currently active RPC call
*/
- static int getCallId() {
+ public static int getCallId() {
Call call = CurCall.get();
return call != null ? call.callId : RpcConstants.INVALID_CALL_ID;
}
@@ -297,10 +303,8 @@ public abstract class Server {
*/
public static InetAddress getRemoteIp() {
Call call = CurCall.get();
- if (call != null) {
- return call.connection.getHostInetAddress();
- }
- return null;
+ return (call != null && call.connection != null) ? call.connection
+ .getHostInetAddress() : null;
}
/**
@@ -325,7 +329,8 @@ public abstract class Server {
*/
public static UserGroupInformation getRemoteUser() {
Call call = CurCall.get();
- return (call != null) ? call.connection.user : null;
+ return (call != null && call.connection != null) ? call.connection.user
+ : null;
}
/** Return true if the invocation was through an RPC.
@@ -463,7 +468,7 @@ public abstract class Server {
}
/** A call queued for handling. */
- private static class Call {
+ public static class Call {
private final int callId; // the client's call id
private final int retryCount; // the retry count of the call
private final Writable rpcRequest; // Serialized Rpc request from client
@@ -474,13 +479,13 @@ public abstract class Server {
private final RPC.RpcKind rpcKind;
private final byte[] clientId;
- private Call(int id, int retryCount, Writable param,
+ public Call(int id, int retryCount, Writable param,
Connection connection) {
this(id, retryCount, param, connection, RPC.RpcKind.RPC_BUILTIN,
RpcConstants.DUMMY_CLIENT_ID);
}
- private Call(int id, int retryCount, Writable param, Connection connection,
+ public Call(int id, int retryCount, Writable param, Connection connection,
RPC.RpcKind kind, byte[] clientId) {
this.callId = id;
this.retryCount = retryCount;
Added:
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java?rev=1506426&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java
(added)
+++
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java
Wed Jul 24 07:48:36 2013
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.ipc.RPC.RpcKind;
+import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for {@link RetryCache}
+ */
+public class TestRetryCache {
+ private static final byte[] CLIENT_ID = StringUtils.getUuidBytes();
+ private static int callId = 100;
+ private static final Random r = new Random();
+ private static final TestServer testServer = new TestServer();
+
+ @Before
+ public void setup() {
+ testServer.resetCounters();
+ }
+
+ static class TestServer {
+ AtomicInteger retryCount = new AtomicInteger();
+ AtomicInteger operationCount = new AtomicInteger();
+ private RetryCache retryCache = new RetryCache("TestRetryCache", 1,
+ 100 * 1000 * 1000 * 1000L);
+
+ /**
+ * A server method implemented using {@link RetryCache}.
+ *
+ * @param input is returned back in echo, if {@code success} is true.
+ * @param failureOuput returned on failure, if {@code success} is false.
+ * @param methodTime time taken by the operation. By passing smaller/larger
+ * value one can simulate an operation that takes short/long time.
+ * @param success whether this operation completes successfully or not
+ * @return return the input parameter {@code input}, if {@code success} is
+ * true, else return {@code failureOutput}.
+ */
+ int echo(int input, int failureOutput, long methodTime, boolean success)
+ throws InterruptedException {
+ CacheEntryWithPayload entry = RetryCache.waitForCompletion(retryCache,
+ null);
+ if (entry != null && entry.isSuccess()) {
+ System.out.println("retryCount incremented " + retryCount.get());
+ retryCount.incrementAndGet();
+ return (Integer) entry.getPayload();
+ }
+ try {
+ operationCount.incrementAndGet();
+ if (methodTime > 0) {
+ Thread.sleep(methodTime);
+ }
+ } finally {
+ RetryCache.setState(entry, success, input);
+ }
+ return success ? input : failureOutput;
+ }
+
+ void resetCounters() {
+ retryCount.set(0);
+ operationCount.set(0);
+ }
+ }
+
+ public static Server.Call newCall() {
+ return new Server.Call(++callId, 1, null, null,
+ RpcKind.RPC_PROTOCOL_BUFFER, CLIENT_ID);
+ }
+
+ /**
+ * This simlulates a long server retried operations. Multiple threads start
an
+ * operation that takes long time and finally succeeds. The retries in this
+ * case end up waiting for the current operation to complete. All the retries
+ * then complete based on the entry in the retry cache.
+ */
+ @Test
+ public void testLongOperationsSuccessful() throws Exception {
+ // Test long successful operations
+ // There is no entry in cache expected when the first operation starts
+ testOperations(r.nextInt(), 100, 20, true, false, newCall());
+ }
+
+ /**
+ * This simlulates a long server operation. Multiple threads start an
+ * operation that takes long time and finally fails. The retries in this case
+ * end up waiting for the current operation to complete. All the retries end
+ * up performing the operation again.
+ */
+ @Test
+ public void testLongOperationsFailure() throws Exception {
+ // Test long failed operations
+ // There is no entry in cache expected when the first operation starts
+ testOperations(r.nextInt(), 100, 20, false, false, newCall());
+ }
+
+ /**
+ * This simlulates a short server operation. Multiple threads start an
+ * operation that takes very short time and finally succeeds. The retries in
+ * this case do not wait long for the current operation to complete. All the
+ * retries then complete based on the entry in the retry cache.
+ */
+ @Test
+ public void testShortOperationsSuccess() throws Exception {
+ // Test long failed operations
+ // There is no entry in cache expected when the first operation starts
+ testOperations(r.nextInt(), 25, 0, false, false, newCall());
+ }
+
+ /**
+ * This simlulates a short server operation. Multiple threads start an
+ * operation that takes short time and finally fails. The retries in this
case
+ * do not wait for the current operation to complete. All the retries end up
+ * performing the operation again.
+ */
+ @Test
+ public void testShortOperationsFailure() throws Exception {
+ // Test long failed operations
+ // There is no entry in cache expected when the first operation starts
+ testOperations(r.nextInt(), 25, 0, false, false, newCall());
+ }
+
+ @Test
+ public void testRetryAfterSuccess() throws Exception {
+ // Previous operation successfully completed
+ Server.Call call = newCall();
+ int input = r.nextInt();
+ Server.getCurCall().set(call);
+ testServer.echo(input, input + 1, 5, true);
+ testOperations(input, 25, 0, true, true, call);
+ }
+
+ @Test
+ public void testRetryAfterFailure() throws Exception {
+ // Previous operation failed
+ Server.Call call = newCall();
+ int input = r.nextInt();
+ Server.getCurCall().set(call);
+ testServer.echo(input, input + 1, 5, false);
+ testOperations(input, 25, 0, false, true, call);
+ }
+
+ public void testOperations(final int input, final int numberOfThreads,
+ final int pause, final boolean success, final boolean attemptedBefore,
+ final Server.Call call) throws InterruptedException, ExecutionException {
+ final int failureOutput = input + 1;
+ ExecutorService executorService = Executors
+ .newFixedThreadPool(numberOfThreads);
+ List<Future<Integer>> list = new ArrayList<Future<Integer>>();
+ for (int i = 0; i < numberOfThreads; i++) {
+ Callable<Integer> worker = new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ Server.getCurCall().set(call);
+ Assert.assertEquals(Server.getCurCall().get(), call);
+ int randomPause = pause == 0 ? pause : r.nextInt(pause);
+ return testServer.echo(input, failureOutput, randomPause, success);
+ }
+ };
+ Future<Integer> submit = executorService.submit(worker);
+ list.add(submit);
+ }
+
+ Assert.assertEquals(numberOfThreads, list.size());
+ for (Future<Integer> future : list) {
+ if (success) {
+ Assert.assertEquals(input, future.get().intValue());
+ } else {
+ Assert.assertEquals(failureOutput, future.get().intValue());
+ }
+ }
+
+ if (success) {
+ // If the operation was successful, all the subsequent operations
+ // by other threads should be retries. Operation count should be 1.
+ int retries = numberOfThreads + (attemptedBefore ? 0 : -1);
+ Assert.assertEquals(1, testServer.operationCount.get());
+ Assert.assertEquals(retries, testServer.retryCount.get());
+ } else {
+ // If the operation failed, all the subsequent operations
+ // should execute once more, hence the retry count should be 0 and
+ // operation count should be the number of tries
+ int opCount = numberOfThreads + (attemptedBefore ? 1 : 0);
+ Assert.assertEquals(opCount, testServer.operationCount.get());
+ Assert.assertEquals(0, testServer.retryCount.get());
+ }
+ }
+}