This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new c33ceb2  HBASE-21943 The usage of RegionLocations.mergeRegionLocations 
is wrong for async client
c33ceb2 is described below

commit c33ceb23d3e1b7d1b9b6d957342b5fa3aa70b29f
Author: Duo Zhang <[email protected]>
AuthorDate: Mon Feb 25 16:01:38 2019 +0800

    HBASE-21943 The usage of RegionLocations.mergeRegionLocations is wrong for 
async client
    
    Signed-off-by: Guanghao Zhang <[email protected]>
---
 .../hbase/client/AsyncNonMetaRegionLocator.java    |  95 +++++++++++--------
 .../hbase/client/AsyncRegionLocatorHelper.java     |  14 ---
 .../TestAsyncTableLocateRegionForDeletedTable.java | 105 +++++++++++++++++++++
 3 files changed, 159 insertions(+), 55 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index 1f23a1d..9246adb 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -23,7 +23,6 @@ import static 
org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
 import static 
org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
 import static 
org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
 import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
-import static 
org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.mergeRegionLocations;
 import static 
org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
 import static 
org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
@@ -218,7 +217,7 @@ class AsyncNonMetaRegionLocator {
         if (loc1.getSeqNum() != loc2.getSeqNum()) {
           return false;
         }
-        if (Objects.equal(loc1.getServerName(), loc2.getServerName())) {
+        if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
           return false;
         }
       }
@@ -226,25 +225,42 @@ class AsyncNonMetaRegionLocator {
     return true;
   }
 
-  // return whether we add this loc to cache
-  private boolean addToCache(TableCache tableCache, RegionLocations locs) {
+  // if we successfully add the locations to cache, return the locations, 
otherwise return the one
+  // which prevents us being added. The upper layer can use this value to 
complete pending requests.
+  private RegionLocations addToCache(TableCache tableCache, RegionLocations 
locs) {
     LOG.trace("Try adding {} to cache", locs);
     byte[] startKey = 
locs.getDefaultRegionLocation().getRegion().getStartKey();
     for (;;) {
       RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs);
       if (oldLocs == null) {
-        return true;
-      }
-      RegionLocations mergedLocs = mergeRegionLocations(locs, oldLocs);
-      if (isEqual(mergedLocs, oldLocs)) {
-        // the merged one is the same with the old one, give up
-        LOG.trace("Will not add {} to cache because the old value {} " +
-          " is newer than us or has the same server name." +
-          " Maybe it is updated before we replace it", locs, oldLocs);
-        return false;
+        return locs;
       }
-      if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) {
-        return true;
+      // check whether the regions are the same, this usually happens when 
table is split/merged, or
+      // deleted and recreated again.
+      RegionInfo region = locs.getRegionLocation().getRegion();
+      RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
+      if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
+        RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
+        if (isEqual(mergedLocs, oldLocs)) {
+          // the merged one is the same with the old one, give up
+          LOG.trace("Will not add {} to cache because the old value {} " +
+            " is newer than us or has the same server name." +
+            " Maybe it is updated before we replace it", locs, oldLocs);
+          return oldLocs;
+        }
+        if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) {
+          return mergedLocs;
+        }
+      } else {
+        // the region is different, here we trust the one we fetched. This 
maybe wrong but finally
+        // the upper layer can detect this and trigger removal of the wrong 
locations
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("The newnly fetch region {} is different from the old one 
{} for row '{}'," +
+            " try replaing the old one...", region, oldRegion, 
Bytes.toStringBinary(startKey));
+        }
+        if (tableCache.cache.replace(startKey, oldLocs, locs)) {
+          return locs;
+        }
       }
     }
   }
@@ -258,37 +274,35 @@ class AsyncNonMetaRegionLocator {
     Optional<LocateRequest> toSend = Optional.empty();
     TableCache tableCache = getTableCache(tableName);
     if (locs != null) {
-      if (!addToCache(tableCache, locs)) {
-        // someone is ahead of us.
-        synchronized (tableCache) {
-          tableCache.pendingRequests.remove(req);
-          tableCache.clearCompletedRequests(Optional.empty());
-          // Remove a complete locate request in a synchronized block, so the 
table cache must have
-          // quota to send a candidate request.
-          toSend = tableCache.getCandidate();
-          toSend.ifPresent(r -> tableCache.send(r));
-        }
-        toSend.ifPresent(r -> locateInMeta(tableName, r));
-        return;
+      RegionLocations addedLocs = addToCache(tableCache, locs);
+      synchronized (tableCache) {
+        tableCache.pendingRequests.remove(req);
+        tableCache.clearCompletedRequests(Optional.of(addedLocs));
+        // Remove a complete locate request in a synchronized block, so the 
table cache must have
+        // quota to send a candidate request.
+        toSend = tableCache.getCandidate();
+        toSend.ifPresent(r -> tableCache.send(r));
       }
-    }
-    synchronized (tableCache) {
-      tableCache.pendingRequests.remove(req);
-      if (error != null) {
+      toSend.ifPresent(r -> locateInMeta(tableName, r));
+    } else {
+      // we meet an error
+      assert error != null;
+      synchronized (tableCache) {
+        tableCache.pendingRequests.remove(req);
         // fail the request itself, no matter whether it is a 
DoNotRetryIOException, as we have
         // already retried several times
         CompletableFuture<?> future = tableCache.allRequests.remove(req);
         if (future != null) {
           future.completeExceptionally(error);
         }
+        tableCache.clearCompletedRequests(Optional.empty());
+        // Remove a complete locate request in a synchronized block, so the 
table cache must have
+        // quota to send a candidate request.
+        toSend = tableCache.getCandidate();
+        toSend.ifPresent(r -> tableCache.send(r));
       }
-      tableCache.clearCompletedRequests(Optional.ofNullable(locs));
-      // Remove a complete locate request in a synchronized block, so the 
table cache must have
-      // quota to send a candidate request.
-      toSend = tableCache.getCandidate();
-      toSend.ifPresent(r -> tableCache.send(r));
+      toSend.ifPresent(r -> locateInMeta(tableName, r));
     }
-    toSend.ifPresent(r -> locateInMeta(tableName, r));
   }
 
   // return whether we should stop the scan
@@ -443,10 +457,9 @@ class AsyncNonMetaRegionLocator {
                 if (info == null || info.isOffline() || info.isSplitParent()) {
                   continue;
                 }
-                if (addToCache(tableCache, locs)) {
-                  synchronized (tableCache) {
-                    tableCache.clearCompletedRequests(Optional.of(locs));
-                  }
+                RegionLocations addedLocs = addToCache(tableCache, locs);
+                synchronized (tableCache) {
+                  tableCache.clearCompletedRequests(Optional.of(addedLocs));
                 }
               }
             }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
index dd516ec..4dde1bb 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
@@ -129,20 +129,6 @@ final class AsyncRegionLocatorHelper {
     }
   }
 
-  /**
-   * Create a new {@link RegionLocations} which is the merging result for the 
given two
-   * {@link RegionLocations}.
-   * <p/>
-   * All the {@link RegionLocations} in async locator related class are 
immutable because we want to
-   * access them concurrently, so here we need to create a new one, instead of 
calling
-   * {@link RegionLocations#mergeLocations(RegionLocations)} directly.
-   */
-  static RegionLocations mergeRegionLocations(RegionLocations newLocs, 
RegionLocations oldLocs) {
-    RegionLocations locs = new RegionLocations(newLocs.getRegionLocations());
-    locs.mergeLocations(oldLocs);
-    return locs;
-  }
-
   static boolean isGood(RegionLocations locs, int replicaId) {
     if (locs == null) {
       return false;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocateRegionForDeletedTable.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocateRegionForDeletedTable.java
new file mode 100644
index 0000000..6ccd9bc
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocateRegionForDeletedTable.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Fix an infinite loop in {@link AsyncNonMetaRegionLocator}, see the comments 
on HBASE-21943 for
+ * more details.
+ */
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableLocateRegionForDeletedTable {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    
HBaseClassTestRule.forClass(TestAsyncTableLocateRegionForDeletedTable.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static byte[] QUALIFIER = Bytes.toBytes("cq");
+
+  private static byte[] VALUE = Bytes.toBytes("value");
+
+  private static AsyncConnection ASYNC_CONN;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(3);
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+    TEST_UTIL.getAdmin().balancerSwitch(false, true);
+    ASYNC_CONN = 
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+    assertFalse(ASYNC_CONN.isClosed());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    IOUtils.closeQuietly(ASYNC_CONN);
+    assertTrue(ASYNC_CONN.isClosed());
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws IOException, InterruptedException {
+    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, 
VALUE));
+      }
+    }
+    TEST_UTIL.getAdmin().split(TABLE_NAME, Bytes.toBytes(50));
+    TEST_UTIL.waitFor(60000,
+      () -> TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).size() == 
2);
+    // make sure we can access the split regions
+    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
+      for (int i = 0; i < 100; i++) {
+        assertFalse(table.get(new Get(Bytes.toBytes(i))).isEmpty());
+      }
+    }
+    // let's cache the two old locations
+    AsyncTableRegionLocator locator = ASYNC_CONN.getRegionLocator(TABLE_NAME);
+    locator.getRegionLocation(Bytes.toBytes(0)).join();
+    locator.getRegionLocation(Bytes.toBytes(99)).join();
+    // recreate the table
+    TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
+    TEST_UTIL.getAdmin().deleteTable(TABLE_NAME);
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+    // confirm that we can still get the correct location
+    assertFalse(ASYNC_CONN.getTable(TABLE_NAME).exists(new 
Get(Bytes.toBytes(99))).join());
+  }
+}

Reply via email to