Author: apurtell Date: Wed Feb 5 18:00:09 2014 New Revision: 1564851 URL: http://svn.apache.org/r1564851 Log: HBASE-10337 HTable.get() uninteruptible (Nicolas Liochon)
Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1564851&r1=1564850&r2=1564851&view=diff ============================================================================== --- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original) +++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed Feb 5 18:00:09 2014 @@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.security. import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -532,6 +533,7 @@ public class HConnectionManager { try { connection.close(); } catch (Exception e) { + ExceptionUtil.rethrowIfInterrupt(e); if (connectSucceeded) { throw new IOException("The connection to " + connection + " could not be deleted.", e); @@ -1123,7 +1125,11 @@ public class HConnectionManager { MetaScanner.metaScan(conf, this, visitor, tableName, row, this.prefetchRegionLimit, TableName.META_TABLE_NAME); } catch (IOException e) { - LOG.warn("Encountered problems when prefetch hbase:meta table: ", e); + if (ExceptionUtil.isInterrupt(e)) { + Thread.currentThread().interrupt(); + } else { + LOG.warn("Encountered problems when prefetch hbase:meta table: ", e); + } } } @@ -1252,6 +1258,8 @@ public class HConnectionManager { // from the HTable constructor. throw e; } catch (IOException e) { + ExceptionUtil.rethrowIfInterrupt(e); + if (e instanceof RemoteException) { e = ((RemoteException)e).unwrapRemoteException(); } @@ -1528,6 +1536,7 @@ public class HConnectionManager { try { zkw = getKeepAliveZooKeeperWatcher(); } catch (IOException e) { + ExceptionUtil.rethrowIfInterrupt(e); throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e); } try { @@ -1588,7 +1597,7 @@ public class HConnectionManager { if (exceptionCaught != null) // It failed. If it's not the last try, we're going to wait a little - if (tries < numTries) { + if (tries < numTries && !ExceptionUtil.isInterrupt(exceptionCaught)) { // tries at this point is 1 or more; decrement to start from 0. long pauseTime = ConnectionUtils.getPauseTime(pause, tries - 1); LOG.info("getMaster attempt " + tries + " of " + numTries + @@ -1598,7 +1607,7 @@ public class HConnectionManager { try { Thread.sleep(pauseTime); } catch (InterruptedException e) { - throw new RuntimeException( + throw new MasterNotRunningException( "Thread was interrupted while trying to connect to master.", e); } } else { Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java?rev=1564851&r1=1564850&r2=1564851&view=diff ============================================================================== --- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java (original) +++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java Wed Feb 5 18:00:09 2014 @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.DoNotRetr import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.ipc.RemoteException; import com.google.protobuf.ServiceException; @@ -130,6 +131,7 @@ public class RpcRetryingCaller<T> { new RetriesExhaustedException.ThrowableWithExtraContext(t, EnvironmentEdgeManager.currentTimeMillis(), toString()); exceptions.add(qt); + ExceptionUtil.rethrowIfInterrupt(t); if (tries >= retries - 1) { throw new RetriesExhaustedException(tries, exceptions); } @@ -184,6 +186,7 @@ public class RpcRetryingCaller<T> { return callable.call(); } catch (Throwable t) { Throwable t2 = translateException(t); + ExceptionUtil.rethrowIfInterrupt(t2); // It would be nice to clear the location cache here. if (t2 instanceof IOException) { throw (IOException)t2; Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java?rev=1564851&r1=1564850&r2=1564851&view=diff ============================================================================== --- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (original) +++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java Wed Feb 5 18:00:09 2014 @@ -26,6 +26,7 @@ import java.io.DataOutputStream; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InterruptedIOException; import java.io.OutputStream; import java.net.ConnectException; import java.net.InetSocketAddress; @@ -76,6 +77,7 @@ import org.apache.hadoop.hbase.security. import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.PoolMap.PoolType; @@ -642,18 +644,19 @@ public class RpcClient { */ private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe) throws IOException { - closeConnection(); // throw the exception if the maximum number of retries is reached - if (curRetries >= maxRetries) { + if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) { throw ioe; } // otherwise back off and retry try { Thread.sleep(failureSleep); - } catch (InterruptedException ignored) {} + } catch (InterruptedException ie) { + ExceptionUtil.rethrowIfInterrupt(ie); + } LOG.info("Retrying connect to server: " + remoteId.getAddress() + " after sleeping " + failureSleep + "ms. Already tried " + curRetries + @@ -672,7 +675,9 @@ public class RpcClient { if (timeout>0) { try { wait(timeout); - } catch (InterruptedException ignored) {} + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } } } @@ -1112,6 +1117,8 @@ public class RpcClient { // since we expect certain responses to not make it by the specified // {@link ConnectionId#rpcTimeout}. closeException = e; + } if (ExceptionUtil.isInterrupt(e)){ + } else { // Treat this as a fatal condition and close this connection markClosed(e); @@ -1425,24 +1432,14 @@ public class RpcClient { Connection connection = getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor); connection.writeRequest(call, priority); // send the parameter - boolean interrupted = false; + //noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (call) { while (!call.done) { if (connection.shouldCloseConnection.get()) { throw new IOException("Unexpected closed connection"); } - try { - call.wait(1000); // wait for the result - } catch (InterruptedException ignored) { - // save the fact that we were interrupted - interrupted = true; - } - } - - if (interrupted) { - // set the interrupt flag now that we are done waiting - Thread.currentThread().interrupt(); + call.wait(1000); // wait for the result } if (call.error != null) { Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1564851&r1=1564850&r2=1564851&view=diff ============================================================================== --- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original) +++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Wed Feb 5 18:00:09 2014 @@ -125,6 +125,7 @@ import org.apache.hadoop.hbase.security. import org.apache.hadoop.hbase.security.visibility.CellVisibility; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.DynamicClassLoader; +import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.Methods; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.Text; @@ -276,8 +277,11 @@ public final class ProtobufUtil { if (e == null) { return new IOException(se); } + if (ExceptionUtil.isInterrupt(e)) { + return ExceptionUtil.asInterrupt(e); + } if (e instanceof RemoteException) { - e = ((RemoteException)e).unwrapRemoteException(); + e = ((RemoteException) e).unwrapRemoteException(); } return e instanceof IOException ? (IOException) e : new IOException(se); } Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java?rev=1564851&view=auto ============================================================================== --- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java (added) +++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java Wed Feb 5 18:00:09 2014 @@ -0,0 +1,54 @@ +package org.apache.hadoop.hbase.util; + +import java.io.InterruptedIOException; +import java.net.SocketTimeoutException; +import java.nio.channels.ClosedByInterruptException; + +/** + * This class handles the different interruption classes. + * It can be: + * - InterruptedException + * - InterruptedIOException (inherits IOException); used in IO + * - ClosedByInterruptException (inherits IOException) + * , - SocketTimeoutException inherits InterruptedIOException but is not a real + * interruption, so we have to distinguish the case. This pattern is unfortunately common. + */ +public class ExceptionUtil { + + /** + * @return true if the throwable comes an interruption, false otherwise. + */ + public static boolean isInterrupt(Throwable t) { + if (t instanceof InterruptedException) return true; + if (t instanceof SocketTimeoutException) return false; + return (t instanceof InterruptedIOException); + } + + /** + * @throws InterruptedIOException if t was an interruption. Does nothing otherwise. + */ + public static void rethrowIfInterrupt(Throwable t) throws InterruptedIOException { + InterruptedIOException iie = asInterrupt(t); + if (iie != null) throw iie; + } + + /** + * @return an InterruptedIOException if t was an interruption, null otherwise + */ + public static InterruptedIOException asInterrupt(Throwable t) { + if (t instanceof SocketTimeoutException) return null; + + if (t instanceof InterruptedIOException) return (InterruptedIOException) t; + + if (t instanceof InterruptedException) { + InterruptedIOException iie = new InterruptedIOException(); + iie.initCause(t); + return iie; + } + + return null; + } + + private ExceptionUtil() { + } +} Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java?rev=1564851&view=auto ============================================================================== --- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java (added) +++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java Wed Feb 5 18:00:09 2014 @@ -0,0 +1,147 @@ +package org.apache.hadoop.hbase.client; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.SocketTimeoutException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +@Category(MediumTests.class) +public class TestClientOperationInterrupt { + private static final Log LOG = LogFactory.getLog(TestClientOperationInterrupt.class); + + private static HBaseTestingUtility util; + private static final byte[] tableName = Bytes.toBytes("test"); + private static final byte[] dummy = Bytes.toBytes("dummy"); + private static final byte[] row1 = Bytes.toBytes("r1"); + private static final byte[] test = Bytes.toBytes("test"); + private static Configuration conf; + + public static class TestCoprocessor extends BaseRegionObserver { + @Override + public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, + final Get get, final List<Cell> results) throws IOException { + Threads.sleep(2500); + } + } + + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = HBaseConfiguration.create(); + conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + TestCoprocessor.class.getName()); + util = new HBaseTestingUtility(conf); + util.startMiniCluster(); + + HBaseAdmin admin = util.getHBaseAdmin(); + if (admin.tableExists(tableName)) { + if (admin.isTableEnabled(tableName)) { + admin.disableTable(tableName); + } + admin.deleteTable(tableName); + } + util.createTable(tableName, new byte[][]{dummy, test}); + + HTable ht = new HTable(conf, tableName); + Put p = new Put(row1); + p.add(dummy, dummy, dummy); + ht.put(p); + } + + + @Test + public void testInterrupt50Percent() throws IOException, InterruptedException { + final AtomicInteger noEx = new AtomicInteger(0); + final AtomicInteger badEx = new AtomicInteger(0); + final AtomicInteger noInt = new AtomicInteger(0); + final AtomicInteger done = new AtomicInteger(0); + List<Thread> threads = new ArrayList<Thread>(); + + final int nbThread = 100; + + for (int i = 0; i < nbThread; i++) { + Thread t = new Thread() { + @Override + public void run() { + try { + HTable ht = new HTable(conf, tableName); + Result r = ht.get(new Get(row1)); + noEx.incrementAndGet(); + } catch (IOException e) { + LOG.info("exception", e); + if (!(e instanceof InterruptedIOException) || (e instanceof SocketTimeoutException)) { + badEx.incrementAndGet(); + } else { + if (Thread.currentThread().isInterrupted()) { + noInt.incrementAndGet(); + LOG.info("The thread should NOT be with the 'interrupt' status."); + } + } + } finally { + done.incrementAndGet(); + } + } + }; + t.setName("TestClientOperationInterrupt #" + i); + threads.add(t); + t.start(); + } + + for (int i = 0; i < nbThread / 2; i++) { + threads.get(i).interrupt(); + } + + + boolean stillAlive = true; + while (stillAlive) { + stillAlive = false; + for (Thread t : threads) { + if (t.isAlive()) { + stillAlive = true; + } + } + Threads.sleep(10); + } + + Assert.assertFalse(Thread.currentThread().isInterrupted()); + + Assert.assertTrue(" noEx: " + noEx.get() + ", badEx=" + badEx.get() + ", noInt=" + noInt.get(), + noEx.get() == nbThread / 2 && badEx.get() == 0); + + // The problem here is that we need the server to free its handlers to handle all operations + while (done.get() != nbThread){ + Thread.sleep(1); + } + + HTable ht = new HTable(conf, tableName); + Result r = ht.get(new Get(row1)); + Assert.assertFalse(r.isEmpty()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } +}