This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new c33ceb2 HBASE-21943 The usage of RegionLocations.mergeRegionLocations
is wrong for async client
c33ceb2 is described below
commit c33ceb23d3e1b7d1b9b6d957342b5fa3aa70b29f
Author: Duo Zhang <[email protected]>
AuthorDate: Mon Feb 25 16:01:38 2019 +0800
HBASE-21943 The usage of RegionLocations.mergeRegionLocations is wrong for
async client
Signed-off-by: Guanghao Zhang <[email protected]>
---
.../hbase/client/AsyncNonMetaRegionLocator.java | 95 +++++++++++--------
.../hbase/client/AsyncRegionLocatorHelper.java | 14 ---
.../TestAsyncTableLocateRegionForDeletedTable.java | 105 +++++++++++++++++++++
3 files changed, 159 insertions(+), 55 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index 1f23a1d..9246adb 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -23,7 +23,6 @@ import static
org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static
org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
import static
org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
-import static
org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.mergeRegionLocations;
import static
org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
import static
org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
@@ -218,7 +217,7 @@ class AsyncNonMetaRegionLocator {
if (loc1.getSeqNum() != loc2.getSeqNum()) {
return false;
}
- if (Objects.equal(loc1.getServerName(), loc2.getServerName())) {
+ if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
return false;
}
}
@@ -226,25 +225,42 @@ class AsyncNonMetaRegionLocator {
return true;
}
- // return whether we add this loc to cache
- private boolean addToCache(TableCache tableCache, RegionLocations locs) {
+ // if we successfully add the locations to cache, return the locations,
otherwise return the one
+ // which prevents us being added. The upper layer can use this value to
complete pending requests.
+ private RegionLocations addToCache(TableCache tableCache, RegionLocations
locs) {
LOG.trace("Try adding {} to cache", locs);
byte[] startKey =
locs.getDefaultRegionLocation().getRegion().getStartKey();
for (;;) {
RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs);
if (oldLocs == null) {
- return true;
- }
- RegionLocations mergedLocs = mergeRegionLocations(locs, oldLocs);
- if (isEqual(mergedLocs, oldLocs)) {
- // the merged one is the same with the old one, give up
- LOG.trace("Will not add {} to cache because the old value {} " +
- " is newer than us or has the same server name." +
- " Maybe it is updated before we replace it", locs, oldLocs);
- return false;
+ return locs;
}
- if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) {
- return true;
+ // check whether the regions are the same, this usually happens when
table is split/merged, or
+ // deleted and recreated again.
+ RegionInfo region = locs.getRegionLocation().getRegion();
+ RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
+ if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
+ RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
+ if (isEqual(mergedLocs, oldLocs)) {
+ // the merged one is the same with the old one, give up
+ LOG.trace("Will not add {} to cache because the old value {} " +
+ " is newer than us or has the same server name." +
+ " Maybe it is updated before we replace it", locs, oldLocs);
+ return oldLocs;
+ }
+ if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) {
+ return mergedLocs;
+ }
+ } else {
+ // the region is different, here we trust the one we fetched. This
maybe wrong but finally
+ // the upper layer can detect this and trigger removal of the wrong
locations
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The newnly fetch region {} is different from the old one
{} for row '{}'," +
+ " try replaing the old one...", region, oldRegion,
Bytes.toStringBinary(startKey));
+ }
+ if (tableCache.cache.replace(startKey, oldLocs, locs)) {
+ return locs;
+ }
}
}
}
@@ -258,37 +274,35 @@ class AsyncNonMetaRegionLocator {
Optional<LocateRequest> toSend = Optional.empty();
TableCache tableCache = getTableCache(tableName);
if (locs != null) {
- if (!addToCache(tableCache, locs)) {
- // someone is ahead of us.
- synchronized (tableCache) {
- tableCache.pendingRequests.remove(req);
- tableCache.clearCompletedRequests(Optional.empty());
- // Remove a complete locate request in a synchronized block, so the
table cache must have
- // quota to send a candidate request.
- toSend = tableCache.getCandidate();
- toSend.ifPresent(r -> tableCache.send(r));
- }
- toSend.ifPresent(r -> locateInMeta(tableName, r));
- return;
+ RegionLocations addedLocs = addToCache(tableCache, locs);
+ synchronized (tableCache) {
+ tableCache.pendingRequests.remove(req);
+ tableCache.clearCompletedRequests(Optional.of(addedLocs));
+ // Remove a complete locate request in a synchronized block, so the
table cache must have
+ // quota to send a candidate request.
+ toSend = tableCache.getCandidate();
+ toSend.ifPresent(r -> tableCache.send(r));
}
- }
- synchronized (tableCache) {
- tableCache.pendingRequests.remove(req);
- if (error != null) {
+ toSend.ifPresent(r -> locateInMeta(tableName, r));
+ } else {
+ // we meet an error
+ assert error != null;
+ synchronized (tableCache) {
+ tableCache.pendingRequests.remove(req);
// fail the request itself, no matter whether it is a
DoNotRetryIOException, as we have
// already retried several times
CompletableFuture<?> future = tableCache.allRequests.remove(req);
if (future != null) {
future.completeExceptionally(error);
}
+ tableCache.clearCompletedRequests(Optional.empty());
+ // Remove a complete locate request in a synchronized block, so the
table cache must have
+ // quota to send a candidate request.
+ toSend = tableCache.getCandidate();
+ toSend.ifPresent(r -> tableCache.send(r));
}
- tableCache.clearCompletedRequests(Optional.ofNullable(locs));
- // Remove a complete locate request in a synchronized block, so the
table cache must have
- // quota to send a candidate request.
- toSend = tableCache.getCandidate();
- toSend.ifPresent(r -> tableCache.send(r));
+ toSend.ifPresent(r -> locateInMeta(tableName, r));
}
- toSend.ifPresent(r -> locateInMeta(tableName, r));
}
// return whether we should stop the scan
@@ -443,10 +457,9 @@ class AsyncNonMetaRegionLocator {
if (info == null || info.isOffline() || info.isSplitParent()) {
continue;
}
- if (addToCache(tableCache, locs)) {
- synchronized (tableCache) {
- tableCache.clearCompletedRequests(Optional.of(locs));
- }
+ RegionLocations addedLocs = addToCache(tableCache, locs);
+ synchronized (tableCache) {
+ tableCache.clearCompletedRequests(Optional.of(addedLocs));
}
}
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
index dd516ec..4dde1bb 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
@@ -129,20 +129,6 @@ final class AsyncRegionLocatorHelper {
}
}
- /**
- * Create a new {@link RegionLocations} which is the merging result for the
given two
- * {@link RegionLocations}.
- * <p/>
- * All the {@link RegionLocations} in async locator related class are
immutable because we want to
- * access them concurrently, so here we need to create a new one, instead of
calling
- * {@link RegionLocations#mergeLocations(RegionLocations)} directly.
- */
- static RegionLocations mergeRegionLocations(RegionLocations newLocs,
RegionLocations oldLocs) {
- RegionLocations locs = new RegionLocations(newLocs.getRegionLocations());
- locs.mergeLocations(oldLocs);
- return locs;
- }
-
static boolean isGood(RegionLocations locs, int replicaId) {
if (locs == null) {
return false;
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocateRegionForDeletedTable.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocateRegionForDeletedTable.java
new file mode 100644
index 0000000..6ccd9bc
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocateRegionForDeletedTable.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Fix an infinite loop in {@link AsyncNonMetaRegionLocator}, see the comments
on HBASE-21943 for
+ * more details.
+ */
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableLocateRegionForDeletedTable {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+
HBaseClassTestRule.forClass(TestAsyncTableLocateRegionForDeletedTable.class);
+
+ private static final HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("async");
+
+ private static byte[] FAMILY = Bytes.toBytes("cf");
+
+ private static byte[] QUALIFIER = Bytes.toBytes("cq");
+
+ private static byte[] VALUE = Bytes.toBytes("value");
+
+ private static AsyncConnection ASYNC_CONN;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniCluster(3);
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ TEST_UTIL.getAdmin().balancerSwitch(false, true);
+ ASYNC_CONN =
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+ assertFalse(ASYNC_CONN.isClosed());
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ IOUtils.closeQuietly(ASYNC_CONN);
+ assertTrue(ASYNC_CONN.isClosed());
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void test() throws IOException, InterruptedException {
+ try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER,
VALUE));
+ }
+ }
+ TEST_UTIL.getAdmin().split(TABLE_NAME, Bytes.toBytes(50));
+ TEST_UTIL.waitFor(60000,
+ () -> TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).size() ==
2);
+ // make sure we can access the split regions
+ try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
+ for (int i = 0; i < 100; i++) {
+ assertFalse(table.get(new Get(Bytes.toBytes(i))).isEmpty());
+ }
+ }
+ // let's cache the two old locations
+ AsyncTableRegionLocator locator = ASYNC_CONN.getRegionLocator(TABLE_NAME);
+ locator.getRegionLocation(Bytes.toBytes(0)).join();
+ locator.getRegionLocation(Bytes.toBytes(99)).join();
+ // recreate the table
+ TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
+ TEST_UTIL.getAdmin().deleteTable(TABLE_NAME);
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ // confirm that we can still get the correct location
+ assertFalse(ASYNC_CONN.getTable(TABLE_NAME).exists(new
Get(Bytes.toBytes(99))).join());
+ }
+}