Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java?rev=1507495&r1=1507494&r2=1507495&view=diff ============================================================================== --- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java (original) +++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java Fri Jul 26 22:38:15 2013 @@ -24,7 +24,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.ServerCallable; +import org.apache.hadoop.hbase.client.RegionServerCallable; +import org.apache.hadoop.hbase.client.RpcRetryingCaller; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; @@ -75,14 +76,15 @@ public class RegionCoprocessorRpcChannel .setServiceName(method.getService().getFullName()) .setMethodName(method.getName()) .setRequest(request.toByteString()).build(); - ServerCallable<CoprocessorServiceResponse> callable = - new ServerCallable<CoprocessorServiceResponse>(connection, table, row) { + RegionServerCallable<CoprocessorServiceResponse> callable = + new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) { public CoprocessorServiceResponse call() throws Exception { - byte[] regionName = location.getRegionInfo().getRegionName(); - return ProtobufUtil.execService(stub, call, regionName); + byte[] regionName = getLocation().getRegionInfo().getRegionName(); + return ProtobufUtil.execService(getStub(), call, regionName); } }; - CoprocessorServiceResponse result = callable.withRetries(); + CoprocessorServiceResponse result = new RpcRetryingCaller<CoprocessorServiceResponse>(). + callWithRetries(callable, this.connection.getConfiguration()); Message response = null; if (result.getValue().hasValue()) { response = responsePrototype.newBuilderForType()
Modified: hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java?rev=1507495&r1=1507494&r2=1507495&view=diff ============================================================================== --- hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java (original) +++ hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java Fri Jul 26 22:38:15 2013 @@ -45,7 +45,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -@SuppressWarnings("ThrowableResultOfMethodCallIgnored") @Category(MediumTests.class) public class TestAsyncProcess { private static final byte[] DUMMY_TABLE = "DUMMY_TABLE".getBytes(); @@ -65,42 +64,40 @@ public class TestAsyncProcess { private static Exception failure = new Exception("failure"); static class MyAsyncProcess<Res> extends AsyncProcess<Res> { - - public MyAsyncProcess(HConnection hc, - AsyncProcessCallback<Res> callback, Configuration conf) { + public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> callback, Configuration conf) { super(hc, DUMMY_TABLE, new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, - new SynchronousQueue<Runnable>(), - Threads.newDaemonThreadFactory("test-TestAsyncProcess")) - , callback, conf); + new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("test-TestAsyncProcess")), + callback, conf); } - /** - * Do not call a server, fails if the rowkey of the operation is{@link #FAILS} - */ @Override - protected ServerCallable<MultiResponse> createCallable( - final HRegionLocation loc, final MultiAction<Row> multi) { - - final MultiResponse mr = new MultiResponse(); - for (Map.Entry<byte[], List<Action<Row>>> entry : multi.actions.entrySet()) { - for (Action a : entry.getValue()) { - if (Arrays.equals(FAILS, a.getAction().getRow())) { - mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), failure); - } else { - mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), success); - } - } - } - - return new MultiServerCallable<Row>(hConnection, tableName, null, null) { + protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) { + final MultiResponse mr = createMultiResponse(callable.getLocation(), callable.getMulti()); + return new RpcRetryingCaller<MultiResponse>() { @Override - public MultiResponse withoutRetries() { + public MultiResponse callWithoutRetries( RetryingCallable<MultiResponse> callable) + throws IOException, RuntimeException { return mr; } }; } } + static MultiResponse createMultiResponse(final HRegionLocation loc, + final MultiAction<Row> multi) { + final MultiResponse mr = new MultiResponse(); + for (Map.Entry<byte[], List<Action<Row>>> entry : multi.actions.entrySet()) { + for (Action a : entry.getValue()) { + if (Arrays.equals(FAILS, a.getAction().getRow())) { + mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), failure); + } else { + mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), success); + } + } + } + return mr; + } + /** * Returns our async process. */ Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1507495&r1=1507494&r2=1507495&view=diff ============================================================================== --- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original) +++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java Fri Jul 26 22:38:15 2013 @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.util; +import java.io.InterruptedIOException; import java.io.PrintWriter; import java.lang.Thread.UncaughtExceptionHandler; import java.util.concurrent.LinkedBlockingQueue; @@ -129,6 +130,7 @@ public class Threads { } /** + * If interrupted, just prints out the interrupt on STDOUT, resets interrupt and returns * @param millis How long to sleep for in milliseconds. */ public static void sleep(long millis) { @@ -136,6 +138,7 @@ public class Threads { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); + Thread.currentThread().interrupt(); } } Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1507495&r1=1507494&r2=1507495&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Fri Jul 26 22:38:15 2013 @@ -60,7 +60,8 @@ import org.apache.hadoop.hbase.exception import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.ServerCallable; +import org.apache.hadoop.hbase.client.RegionServerCallable; +import org.apache.hadoop.hbase.client.RpcRetryingCaller; import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient; import org.apache.hadoop.hbase.io.HalfStoreFileReader; import org.apache.hadoop.hbase.io.Reference; @@ -536,23 +537,24 @@ public class LoadIncrementalHFiles exten famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); } - final ServerCallable<Boolean> svrCallable = new ServerCallable<Boolean>(conn, - tableName, first) { + final RegionServerCallable<Boolean> svrCallable = + new RegionServerCallable<Boolean>(conn, tableName, first) { @Override public Boolean call() throws Exception { SecureBulkLoadClient secureClient = null; boolean success = false; try { - LOG.debug("Going to connect to server " + location + " for row " - + Bytes.toStringBinary(row)); - byte[] regionName = location.getRegionInfo().getRegionName(); + LOG.debug("Going to connect to server " + getLocation() + " for row " + + Bytes.toStringBinary(getRow())); + byte[] regionName = getLocation().getRegionInfo().getRegionName(); if(!useSecure) { - success = ProtobufUtil.bulkLoadHFile(stub, famPaths, regionName, assignSeqIds); + success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds); } else { - HTable table = new HTable(conn.getConfiguration(), tableName); + HTable table = new HTable(conn.getConfiguration(), getTableName()); secureClient = new SecureBulkLoadClient(table); - success = secureClient.bulkLoadHFiles(famPaths, userToken, bulkToken, location.getRegionInfo().getStartKey()); + success = secureClient.bulkLoadHFiles(famPaths, userToken, bulkToken, + getLocation().getRegionInfo().getStartKey()); } return success; } finally { @@ -586,7 +588,7 @@ public class LoadIncrementalHFiles exten try { List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>(); - boolean success = svrCallable.withRetries(); + boolean success = new RpcRetryingCaller<Boolean>().callWithRetries(svrCallable, getConf()); if (!success) { LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) + " into table " Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java?rev=1507495&r1=1507494&r2=1507495&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java Fri Jul 26 22:38:15 2013 @@ -34,15 +34,11 @@ import org.apache.hadoop.hbase.HRegionIn import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.client.Action; import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.client.ServerCallable; -import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException; -import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException; -import org.apache.hadoop.hbase.ipc.RpcClient; -import org.apache.hadoop.hbase.master.SplitLogManager; +import org.apache.hadoop.hbase.client.RpcRetryingCaller; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; @@ -166,7 +162,7 @@ public class WALEditsReplaySink { try { ReplayServerCallable<MultiResponse> callable = new ReplayServerCallable<MultiResponse>( this.conn, this.tableName, regionLoc, regionInfo, actions); - callable.withRetries(); + new RpcRetryingCaller<MultiResponse>().callWithRetries(callable, conf, this.replayTimeout); } catch (IOException ie) { if (skipErrors) { LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS @@ -181,19 +177,17 @@ public class WALEditsReplaySink { * Callable that handles the <code>replay</code> method call going against a single regionserver * @param <R> */ - class ReplayServerCallable<R> extends ServerCallable<MultiResponse> { + class ReplayServerCallable<R> extends RegionServerCallable<MultiResponse> { private HRegionInfo regionInfo; private List<Action<Row>> actions; - private Map<HRegionLocation, Map<HRegionInfo, List<Action<Row>>>> retryActions = null; - ReplayServerCallable(final HConnection connection, final byte [] tableName, final HRegionLocation regionLoc, final HRegionInfo regionInfo, final List<Action<Row>> actions) { - super(connection, tableName, null, replayTimeout); + super(connection, tableName, null); this.actions = actions; this.regionInfo = regionInfo; - this.location = regionLoc; + setLocation(regionLoc); } @Override @@ -208,7 +202,7 @@ public class WALEditsReplaySink { private void replayToServer(HRegionInfo regionInfo, List<Action<Row>> actions) throws IOException, ServiceException { - AdminService.BlockingInterface remoteSvr = connection.getAdmin(location.getServerName()); + AdminService.BlockingInterface remoteSvr = conn.getAdmin(getLocation().getServerName()); MultiRequest request = RequestConverter.buildMultiRequest(regionInfo.getRegionName(), actions); MultiResponse protoResults = remoteSvr.replay(null, request); @@ -235,16 +229,14 @@ public class WALEditsReplaySink { @Override public void prepare(boolean reload) throws IOException { if (!reload) return; - // relocate regions in case we have a new dead server or network hiccup // if not due to connection issue, the following code should run fast because it uses // cached location for (Action<Row> action : actions) { // use first row to relocate region because all actions are for one region - this.location = this.connection.locateRegion(tableName, action.getAction().getRow()); + setLocation(conn.locateRegion(tableName, action.getAction().getRow())); break; } } } - -} +} \ No newline at end of file Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java?rev=1507495&r1=1507494&r2=1507495&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java (original) +++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java Fri Jul 26 22:38:15 2013 @@ -43,7 +43,7 @@ public class HConnectionTestingUtility { * configuration instance. Minimally the mock will return * <code>conf</conf> when {@link HConnection#getConfiguration()} is invoked. * Be sure to shutdown the connection when done by calling - * {@link HConnectionManager#deleteConnection(HConnectionKey, boolean)} else it + * {@link HConnectionManager#deleteConnection(Configuration)} else it * will stick around; this is probably not what you want. * @param conf configuration * @return HConnection object for <code>conf</code> @@ -69,7 +69,7 @@ public class HConnectionTestingUtility { * more of the popular {@link HConnection} methods so they do 'normal' * operation (see return doc below for list). Be sure to shutdown the * connection when done by calling - * {@link HConnectionManager#deleteConnection(HConnectionKey, boolean)} else it + * {@link HConnectionManager#deleteConnection(Configuration)} else it * will stick around; this is probably not what you want. * * @param conf Configuration to use @@ -88,7 +88,7 @@ public class HConnectionTestingUtility { * {@link HConnection#getAdmin(ServerName)} is called, returns the passed * {@link ClientProtos.ClientService.BlockingInterface} instance when * {@link HConnection#getClient(ServerName)} is called (Be sure to call - * {@link HConnectionManager#deleteConnection(HConnectionKey, boolean)} + * {@link HConnectionManager#deleteConnection(Configuration)} * when done with this mocked Connection. * @throws IOException */ @@ -123,7 +123,7 @@ public class HConnectionTestingUtility { * Get a Mockito spied-upon {@link HConnection} that goes with the passed * <code>conf</code> configuration instance. * Be sure to shutdown the connection when done by calling - * {@link HConnectionManager#deleteConnection(HConnectionKey, boolean)} else it + * {@link HConnectionManager#deleteConnection(Configuration)} else it * will stick around; this is probably not what you want. * @param conf configuration * @return HConnection object for <code>conf</code> Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java?rev=1507495&view=auto ============================================================================== --- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java (added) +++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java Fri Jul 26 22:38:15 2013 @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.exceptions.MasterNotRunningException; +import org.apache.hadoop.hbase.exceptions.PleaseHoldException; +import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.mortbay.log.Log; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +@Category(SmallTests.class) +public class TestHBaseAdminNoCluster { + /** + * Verify that PleaseHoldException gets retried. + * HBASE-8764 + * @throws IOException + * @throws ZooKeeperConnectionException + * @throws MasterNotRunningException + * @throws ServiceException + */ + @Test + public void testMasterMonitorCollableRetries() + throws MasterNotRunningException, ZooKeeperConnectionException, IOException, ServiceException { + Configuration configuration = HBaseConfiguration.create(); + // Set the pause and retry count way down. + configuration.setLong(HConstants.HBASE_CLIENT_PAUSE, 1); + final int count = 10; + configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, count); + // Get mocked connection. Getting the connection will register it so when HBaseAdmin is + // constructed with same configuration, it will find this mocked connection. + HConnection connection = HConnectionTestingUtility.getMockedConnection(configuration); + // Mock so we get back the master interface. Make it so when createTable is called, we throw + // the PleaseHoldException. + MasterAdminKeepAliveConnection masterAdmin = + Mockito.mock(MasterAdminKeepAliveConnection.class); + Mockito.when(masterAdmin.createTable((RpcController)Mockito.any(), + (MasterAdminProtos.CreateTableRequest)Mockito.any())). + thenThrow(new ServiceException("Test fail").initCause(new PleaseHoldException("test"))); + Mockito.when(connection.getKeepAliveMasterAdminService()).thenReturn(masterAdmin); + // Mock up our admin Interfaces + HBaseAdmin admin = new HBaseAdmin(configuration); + try { + HTableDescriptor htd = new HTableDescriptor("testMasterMonitorCollableRetries"); + // Pass any old htable descriptor; not important + try { + admin.createTable(htd, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + fail(); + } catch (RetriesExhaustedException e) { + Log.info("Expected fail", e); + } + // Assert we were called 'count' times. + Mockito.verify(masterAdmin, Mockito.atLeast(count)).createTable((RpcController)Mockito.any(), + (MasterAdminProtos.CreateTableRequest)Mockito.any()); + } finally { + admin.close(); + if (connection != null)HConnectionManager.deleteConnection(configuration); + } + } +} \ No newline at end of file Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java?rev=1507495&r1=1507494&r2=1507495&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java (original) +++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java Fri Jul 26 22:38:15 2013 @@ -32,10 +32,11 @@ import org.apache.hadoop.hbase.Multithre import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.RpcRetryingCaller; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.ServerCallable; import org.apache.hadoop.hbase.exceptions.TableExistsException; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -141,41 +142,43 @@ public class TestHRegionServerBulkLoad { } // bulk load HFiles - HConnection conn = UTIL.getHBaseAdmin().getConnection(); + final HConnection conn = UTIL.getHBaseAdmin().getConnection(); byte[] tbl = Bytes.toBytes(tableName); - new ServerCallable<Void>(conn, tbl, Bytes - .toBytes("aaa")) { + RegionServerCallable<Void> callable = + new RegionServerCallable<Void>(conn, tbl, Bytes.toBytes("aaa")) { @Override public Void call() throws Exception { - LOG.debug("Going to connect to server " + location + " for row " - + Bytes.toStringBinary(row)); - byte[] regionName = location.getRegionInfo().getRegionName(); + LOG.debug("Going to connect to server " + getLocation() + " for row " + + Bytes.toStringBinary(getRow())); + byte[] regionName = getLocation().getRegionInfo().getRegionName(); BulkLoadHFileRequest request = RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true); - stub.bulkLoadHFile(null, request); + getStub().bulkLoadHFile(null, request); return null; } - }.withRetries(); + }; + RpcRetryingCaller<Void> caller = new RpcRetryingCaller<Void>(); + caller.callWithRetries(callable, UTIL.getConfiguration()); // Periodically do compaction to reduce the number of open file handles. if (numBulkLoads.get() % 10 == 0) { // 10 * 50 = 500 open file handles! - new ServerCallable<Void>(conn, tbl, - Bytes.toBytes("aaa")) { + callable = new RegionServerCallable<Void>(conn, tbl, Bytes.toBytes("aaa")) { @Override public Void call() throws Exception { - LOG.debug("compacting " + location + " for row " - + Bytes.toStringBinary(row)); + LOG.debug("compacting " + getLocation() + " for row " + + Bytes.toStringBinary(getRow())); AdminProtos.AdminService.BlockingInterface server = - connection.getAdmin(location.getServerName()); + conn.getAdmin(getLocation().getServerName()); CompactRegionRequest request = RequestConverter.buildCompactRegionRequest( - location.getRegionInfo().getRegionName(), true, null); + getLocation().getRegionInfo().getRegionName(), true, null); server.compactRegion(null, request); numCompactions.incrementAndGet(); return null; } - }.withRetries(); + }; + caller.callWithRetries(callable, UTIL.getConfiguration()); } } } Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1507495&r1=1507494&r2=1507495&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original) +++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Fri Jul 26 22:38:15 2013 @@ -22,6 +22,7 @@ import static org.junit.Assert.*; import java.io.IOException; import java.lang.reflect.Method; +import java.net.BindException; import java.util.TreeMap; import java.util.List; import java.util.Map; @@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver; @@ -377,7 +379,7 @@ public class TestHLog { * [FSNamesystem.nextGenerationStampForBlock]) * 3. HDFS-142 (on restart, maintain pendingCreates) */ - @Test + @Test (timeout=300000) public void testAppendClose() throws Exception { byte [] tableName = Bytes.toBytes(getName()); HRegionInfo regioninfo = new HRegionInfo(tableName, @@ -422,16 +424,16 @@ public class TestHLog { Thread.sleep(1000); } assertFalse(cluster.isClusterUp()); - - // Workaround a strange issue with Hadoop's RPC system - if we don't - // sleep here, the new datanodes will pick up a cached IPC connection to - // the old (dead) NN and fail to start. Sleeping 2 seconds goes past - // the idle time threshold configured in the conf above - Thread.sleep(2000); - - LOG.info("Waiting a few seconds before re-starting HDFS"); - Thread.sleep(5000); - cluster = TEST_UTIL.startMiniDFSClusterForTestHLog(namenodePort); + cluster = null; + for (int i = 0; i < 100; i++) { + try { + cluster = TEST_UTIL.startMiniDFSClusterForTestHLog(namenodePort); + break; + } catch (BindException e) { + LOG.info("Sleeping. BindException bringing up new cluster"); + Threads.sleep(1000); + } + } cluster.waitActive(); fs = cluster.getFileSystem(); LOG.info("STARTED second instance.");
