Repository: hbase Updated Branches: refs/heads/master c1293cc91 -> b3ae87bd7
HBASE-17251 Add a timeout parameter when locating region Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b3ae87bd Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b3ae87bd Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b3ae87bd Branch: refs/heads/master Commit: b3ae87bd7dd539fd8d5409076933d4528ff6b14c Parents: c1293cc Author: zhangduo <[email protected]> Authored: Wed Dec 7 16:57:04 2016 +0800 Committer: zhangduo <[email protected]> Committed: Thu Dec 8 09:55:29 2016 +0800 ---------------------------------------------------------------------- .../hbase/client/AsyncConnectionImpl.java | 7 +- .../hadoop/hbase/client/AsyncRegionLocator.java | 64 +++++++--- .../AsyncSingleRequestRpcRetryingCaller.java | 45 +++++-- .../client/AsyncTableRegionLocatorImpl.java | 2 +- .../hbase/client/TestAsyncRegionLocator.java | 28 ++--- .../client/TestAsyncRegionLocatorTimeout.java | 120 +++++++++++++++++++ ...TestAsyncSingleRequestRpcRetryingCaller.java | 54 +++++---- 7 files changed, 253 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/b3ae87bd/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 5c32a9f..92785fb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -24,6 +24,8 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY; +import com.google.common.annotations.VisibleForTesting; + import io.netty.util.HashedWheelTimer; import java.io.IOException; @@ -56,7 +58,8 @@ class AsyncConnectionImpl implements AsyncConnection { private static final Log LOG = LogFactory.getLog(AsyncConnectionImpl.class); - private static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( + @VisibleForTesting + static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS); private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; @@ -92,7 +95,7 @@ class AsyncConnectionImpl implements AsyncConnection { this.conf = conf; this.user = user; this.connConf = new AsyncConnectionConfiguration(conf); - this.locator = new AsyncRegionLocator(this); + this.locator = new AsyncRegionLocator(this, RETRY_TIMER); this.registry = AsyncRegistryFactory.getRegistry(conf); this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> { if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/b3ae87bd/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java index 6b74e4c..ae8f2a2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java @@ -27,6 +27,9 @@ import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findExcept import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; + import java.io.IOException; import java.util.Arrays; import java.util.List; @@ -36,9 +39,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -50,6 +55,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.RegionMovedException; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.util.Bytes; /** @@ -62,6 +68,8 @@ class AsyncRegionLocator { private final AsyncConnectionImpl conn; + private final HashedWheelTimer retryTimer; + private final AtomicReference<HRegionLocation> metaRegionLocation = new AtomicReference<>(); private final AtomicReference<CompletableFuture<HRegionLocation>> metaRelocateFuture = @@ -70,8 +78,9 @@ class AsyncRegionLocator { private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], HRegionLocation>> cache = new ConcurrentHashMap<>(); - AsyncRegionLocator(AsyncConnectionImpl conn) { + AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) { this.conn = conn; + this.retryTimer = retryTimer; } private CompletableFuture<HRegionLocation> locateMetaRegion() { @@ -249,9 +258,6 @@ class AsyncRegionLocator { return; } otherCheck.accept(loc); - if (future.isDone()) { - return; - } addToCache(loc); future.complete(loc); } @@ -282,12 +288,34 @@ class AsyncRegionLocator { return locateInMeta(tableName, row); } - CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row) { - if (tableName.equals(META_TABLE_NAME)) { - return locateMetaRegion(); - } else { - return locateRegion(tableName, row); - } + private CompletableFuture<HRegionLocation> withTimeout(CompletableFuture<HRegionLocation> future, + long timeoutNs, Supplier<String> timeoutMsg) { + if (future.isDone() || timeoutNs <= 0) { + return future; + } + CompletableFuture<HRegionLocation> timeoutFuture = new CompletableFuture<>(); + Timeout timeoutTask = retryTimer.newTimeout( + t -> timeoutFuture.completeExceptionally(new TimeoutIOException(timeoutMsg.get())), timeoutNs, + TimeUnit.NANOSECONDS); + future.whenComplete((loc, error) -> { + timeoutTask.cancel(); + if (error != null) { + timeoutFuture.completeExceptionally(error); + } else { + timeoutFuture.complete(loc); + } + }); + return timeoutFuture; + } + + CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, + long timeoutNs) { + CompletableFuture<HRegionLocation> future = + tableName.equals(META_TABLE_NAME) ? locateMetaRegion() : locateRegion(tableName, row); + return withTimeout(future, timeoutNs, + () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + + "ms) waiting for region location for " + tableName + ", row='" + + Bytes.toStringBinary(row) + "'"); } private HRegionLocation locatePreviousInCache(TableName tableName, @@ -356,14 +384,18 @@ class AsyncRegionLocator { /** * Locate the previous region using the current regions start key. Used for reverse scan. + * <p> + * TODO: need to deal with region merge where the startRowOfCurrentRegion will not be the endRow + * of a region. */ CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName, - byte[] startRowOfCurrentRegion) { - if (tableName.equals(META_TABLE_NAME)) { - return locateMetaRegion(); - } else { - return locatePreviousRegion(tableName, startRowOfCurrentRegion); - } + byte[] startRowOfCurrentRegion, long timeoutNs) { + CompletableFuture<HRegionLocation> future = tableName.equals(META_TABLE_NAME) + ? locateMetaRegion() : locatePreviousRegion(tableName, startRowOfCurrentRegion); + return withTimeout(future, timeoutNs, + () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + + "ms) waiting for region location for " + tableName + ", startRowOfCurrentRegion='" + + Bytes.toStringBinary(startRowOfCurrentRegion) + "'"); } private boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) { http://git-wip-us.apache.org/repos/asf/hbase/blob/b3ae87bd/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 36687c6..44a237d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; import org.apache.commons.logging.Log; @@ -51,6 +52,9 @@ class AsyncSingleRequestRpcRetryingCaller<T> { private static final Log LOG = LogFactory.getLog(AsyncSingleRequestRpcRetryingCaller.class); + // Add a delta to avoid timeout immediately after a retry sleeping. + private static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1); + @FunctionalInterface public interface Callable<T> { CompletableFuture<T> call(HBaseRpcController controller, HRegionLocation loc, @@ -65,7 +69,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> { private final byte[] row; - private final Supplier<CompletableFuture<HRegionLocation>> locate; + private final Function<Long, CompletableFuture<HRegionLocation>> locate; private final Callable<T> callable; @@ -118,6 +122,10 @@ class AsyncSingleRequestRpcRetryingCaller<T> { return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); } + private long remainingTimeNs() { + return operationTimeoutNs - (System.nanoTime() - startNs); + } + private void completeExceptionally() { future.completeExceptionally(new RetriesExhaustedException(tries, exceptions)); } @@ -138,7 +146,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> { } long delayNs; if (operationTimeoutNs > 0) { - long maxDelayNs = operationTimeoutNs - (System.nanoTime() - startNs); + long maxDelayNs = operationTimeoutNs - (System.nanoTime() - startNs) - SLEEP_DELTA_NS; if (maxDelayNs <= 0) { completeExceptionally(); return; @@ -153,6 +161,17 @@ class AsyncSingleRequestRpcRetryingCaller<T> { } private void call(HRegionLocation loc) { + long callTimeoutNs; + if (operationTimeoutNs > 0) { + callTimeoutNs = remainingTimeNs(); + if (callTimeoutNs <= 0) { + completeExceptionally(); + return; + } + callTimeoutNs = Math.min(callTimeoutNs, rpcTimeoutNs); + } else { + callTimeoutNs = rpcTimeoutNs; + } ClientService.Interface stub; try { stub = conn.getRegionServerStub(loc.getServerName()); @@ -166,7 +185,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> { err -> conn.getLocator().updateCachedLocation(loc, err)); return; } - resetController(controller, rpcTimeoutNs); + resetController(controller, callTimeoutNs); callable.call(controller, loc, stub).whenComplete((result, error) -> { if (error != null) { onError(error, @@ -183,7 +202,17 @@ class AsyncSingleRequestRpcRetryingCaller<T> { } private void locateThenCall() { - locate.get().whenComplete((loc, error) -> { + long locateTimeoutNs; + if (operationTimeoutNs > 0) { + locateTimeoutNs = remainingTimeNs(); + if (locateTimeoutNs <= 0) { + completeExceptionally(); + return; + } + } else { + locateTimeoutNs = -1L; + } + locate.apply(locateTimeoutNs).whenComplete((loc, error) -> { if (error != null) { onError(error, () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed, tries = " @@ -198,12 +227,12 @@ class AsyncSingleRequestRpcRetryingCaller<T> { }); } - private CompletableFuture<HRegionLocation> locate() { - return conn.getLocator().getRegionLocation(tableName, row); + private CompletableFuture<HRegionLocation> locate(long timeoutNs) { + return conn.getLocator().getRegionLocation(tableName, row, timeoutNs); } - private CompletableFuture<HRegionLocation> locatePrevious() { - return conn.getLocator().getPreviousRegionLocation(tableName, row); + private CompletableFuture<HRegionLocation> locatePrevious(long timeoutNs) { + return conn.getLocator().getPreviousRegionLocation(tableName, row, timeoutNs); } public CompletableFuture<T> call() { http://git-wip-us.apache.org/repos/asf/hbase/blob/b3ae87bd/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java index b29f878..e1f40a7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java @@ -45,6 +45,6 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator { @Override public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) { - return locator.getRegionLocation(tableName, row); + return locator.getRegionLocation(tableName, row, 0L); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/b3ae87bd/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java index 2e46d8a..a679192 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java @@ -102,12 +102,12 @@ public class TestAsyncRegionLocator { @Test public void testNoTable() throws InterruptedException { try { - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get(); + LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get(); } catch (ExecutionException e) { assertThat(e.getCause(), instanceOf(TableNotFoundException.class)); } try { - LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get(); + LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW, 0L).get(); } catch (ExecutionException e) { assertThat(e.getCause(), instanceOf(TableNotFoundException.class)); } @@ -118,12 +118,12 @@ public class TestAsyncRegionLocator { createSingleRegionTable(); TEST_UTIL.getAdmin().disableTable(TABLE_NAME); try { - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get(); + LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get(); } catch (ExecutionException e) { assertThat(e.getCause(), instanceOf(TableNotFoundException.class)); } try { - LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get(); + LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW, 0L).get(); } catch (ExecutionException e) { assertThat(e.getCause(), instanceOf(TableNotFoundException.class)); } @@ -143,17 +143,17 @@ public class TestAsyncRegionLocator { createSingleRegionTable(); ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName(); assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get()); + LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get()); assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, - LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_START_ROW).get()); + LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get()); byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)]; ThreadLocalRandom.current().nextBytes(randKey); assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, - LOCATOR.getRegionLocation(TABLE_NAME, randKey).get()); + LOCATOR.getRegionLocation(TABLE_NAME, randKey, 0L).get()); // Use a key which is not the endKey of a region will cause error try { assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, - LOCATOR.getPreviousRegionLocation(TABLE_NAME, new byte[] { 1 }).get()); + LOCATOR.getPreviousRegionLocation(TABLE_NAME, new byte[] { 1 }, 0L).get()); } catch (ExecutionException e) { assertThat(e.getCause(), instanceOf(IOException.class)); assertTrue(e.getCause().getMessage().contains("end key of")); @@ -193,7 +193,7 @@ public class TestAsyncRegionLocator { IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> { try { assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1], - serverNames[i], LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i]).get()); + serverNames[i], LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i], 0L).get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } @@ -204,7 +204,7 @@ public class TestAsyncRegionLocator { n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> { try { assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i], - LOCATOR.getPreviousRegionLocation(TABLE_NAME, endKeys[i]).get()); + LOCATOR.getPreviousRegionLocation(TABLE_NAME, endKeys[i], 0L).get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } @@ -215,7 +215,7 @@ public class TestAsyncRegionLocator { public void testRegionMove() throws IOException, InterruptedException, ExecutionException { createSingleRegionTable(); ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName(); - HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get(); + HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get(); assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc); ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)) @@ -228,12 +228,12 @@ public class TestAsyncRegionLocator { Thread.sleep(100); } // Should be same as it is in cache - assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get()); + assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get()); LOCATOR.updateCachedLocation(loc, null); // null error will not trigger a cache cleanup - assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get()); + assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get()); LOCATOR.updateCachedLocation(loc, new NotServingRegionException()); assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get()); + LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/b3ae87bd/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java new file mode 100644 index 0000000..2a902a6 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; +import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncRegionLocatorTimeout { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static AsyncConnectionImpl CONN; + + private static AsyncRegionLocator LOCATOR; + + private static volatile long SLEEP_MS = 0L; + + public static class SleepRegionObserver extends BaseRegionObserver { + + @Override + public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, + RegionScanner s) throws IOException { + if (SLEEP_MS > 0) { + Threads.sleepWithoutInterrupt(SLEEP_MS); + } + return super.preScannerOpen(e, scan, s); + } + } + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(REGION_COPROCESSOR_CONF_KEY, SleepRegionObserver.class.getName()); + conf.setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 2000); + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + CONN = new AsyncConnectionImpl(conf, User.getCurrent()); + LOCATOR = CONN.getLocator(); + } + + @AfterClass + public static void tearDown() throws Exception { + CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws InterruptedException, ExecutionException { + SLEEP_MS = 1000; + long startNs = System.nanoTime(); + try { + LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, TimeUnit.MILLISECONDS.toNanos(500)) + .get(); + fail(); + } catch (ExecutionException e) { + e.printStackTrace(); + assertThat(e.getCause(), instanceOf(TimeoutIOException.class)); + } + long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); + assertTrue(costMs >= 500); + assertTrue(costMs < 1000); + // wait for the background task finish + Thread.sleep(2000); + // Now the location should be in cache, so we will not visit meta again. + HRegionLocation loc = LOCATOR + .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, TimeUnit.MILLISECONDS.toNanos(500)).get(); + assertEquals(loc.getServerName(), + TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b3ae87bd/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java index 0b3e186..f76e240 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java @@ -150,33 +150,35 @@ public class TestAsyncSingleRequestRpcRetryingCaller { AtomicBoolean errorTriggered = new AtomicBoolean(false); AtomicInteger count = new AtomicInteger(0); HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get(); - AsyncRegionLocator mockedLocator = new AsyncRegionLocator(asyncConn) { - @Override - CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row) { - if (tableName.equals(TABLE_NAME)) { - CompletableFuture<HRegionLocation> future = new CompletableFuture<>(); - if (count.getAndIncrement() == 0) { - errorTriggered.set(true); - future.completeExceptionally(new RuntimeException("Inject error!")); - } else { - future.complete(loc); + AsyncRegionLocator mockedLocator = + new AsyncRegionLocator(asyncConn, AsyncConnectionImpl.RETRY_TIMER) { + @Override + CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, + long timeoutNs) { + if (tableName.equals(TABLE_NAME)) { + CompletableFuture<HRegionLocation> future = new CompletableFuture<>(); + if (count.getAndIncrement() == 0) { + errorTriggered.set(true); + future.completeExceptionally(new RuntimeException("Inject error!")); + } else { + future.complete(loc); + } + return future; + } else { + return super.getRegionLocation(tableName, row, timeoutNs); + } + } + + @Override + CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName, + byte[] startRowOfCurrentRegion, long timeoutNs) { + return super.getPreviousRegionLocation(tableName, startRowOfCurrentRegion, timeoutNs); + } + + @Override + void updateCachedLocation(HRegionLocation loc, Throwable exception) { } - return future; - } else { - return super.getRegionLocation(tableName, row); - } - } - - @Override - CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName, - byte[] startRowOfCurrentRegion) { - return super.getPreviousRegionLocation(tableName, startRowOfCurrentRegion); - } - - @Override - void updateCachedLocation(HRegionLocation loc, Throwable exception) { - } - }; + }; try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(asyncConn.getConfiguration(), User.getCurrent()) {
