Repository: hbase
Updated Branches:
  refs/heads/branch-2 f9e18cf31 -> 3d1a2dbe6
  refs/heads/branch-2.0 cca7fd807 -> 2155766d0
  refs/heads/branch-2.1 b7c2b953b -> c41003f5e
  refs/heads/master aa9e1d051 -> 56ac4705e


HBASE-21196 HTableMultiplexer clears the meta cache after every put operation

Signed-off-by: Andrew Purtell <apurt...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/56ac4705
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/56ac4705
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/56ac4705

Branch: refs/heads/master
Commit: 56ac4705e9a89c3b01eb42eb9e1ceac358d414b3
Parents: aa9e1d0
Author: Nihal Jain <nihaljain...@gmail.com>
Authored: Sat Sep 15 00:19:04 2018 +0530
Committer: Andrew Purtell <apurt...@apache.org>
Committed: Fri Sep 28 16:35:46 2018 -0700

----------------------------------------------------------------------
 .../hbase/client/AsyncRequestFutureImpl.java    |   4 +-
 .../hbase/client/TestRegionLocationCaching.java | 177 +++++++++++++++++++
 2 files changed, 180 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/56ac4705/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
index 80535a1..b75b0c6 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
@@ -826,7 +826,9 @@ class AsyncRequestFutureImpl<CResult> implements 
AsyncRequestFuture {
       byte[] regionName = regionEntry.getKey();
 
       Throwable regionException = responses.getExceptions().get(regionName);
-      cleanServerCache(server, regionException);
+      if (regionException != null) {
+        cleanServerCache(server, regionException);
+      }
 
       Map<Integer, Object> regionResults =
         results.containsKey(regionName) ? results.get(regionName).result : 
Collections.emptyMap();

http://git-wip-us.apache.org/repos/asf/hbase/blob/56ac4705/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocationCaching.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocationCaching.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocationCaching.java
new file mode 100644
index 0000000..0134276
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocationCaching.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MediumTests.class, ClientTests.class})
+public class TestRegionLocationCaching {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestRegionLocationCaching.class);
+
+  private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+  private static int SLAVES = 1;
+  private static int PER_REGIONSERVER_QUEUE_SIZE = 100000;
+  private static TableName TABLE_NAME = 
TableName.valueOf("TestRegionLocationCaching");
+  private static byte[] FAMILY = Bytes.toBytes("testFamily");
+  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(SLAVES);
+    TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY });
+    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testCachingForHTableMultiplexerSinglePut() throws Exception {
+    HTableMultiplexer multiplexer =
+        new HTableMultiplexer(TEST_UTIL.getConfiguration(), 
PER_REGIONSERVER_QUEUE_SIZE);
+    byte[] row = Bytes.toBytes("htable_multiplexer_single_put");
+    byte[] value = Bytes.toBytes("value");
+
+    Put put = new Put(row);
+    put.addColumn(FAMILY, QUALIFIER, value);
+    assertTrue("Put request not accepted by multiplexer queue", 
multiplexer.put(TABLE_NAME, put));
+
+    checkRegionLocationIsCached(TABLE_NAME, multiplexer.getConnection());
+    checkExistence(TABLE_NAME, row, FAMILY, QUALIFIER);
+
+    multiplexer.close();
+  }
+
+  @Test
+  public void testCachingForHTableMultiplexerMultiPut() throws Exception {
+    HTableMultiplexer multiplexer =
+        new HTableMultiplexer(TEST_UTIL.getConfiguration(), 
PER_REGIONSERVER_QUEUE_SIZE);
+
+    List<Put> multiput = new ArrayList<Put>();
+    for (int i = 0; i < 10; i++) {
+      Put put = new Put(Bytes.toBytes("htable_multiplexer_multi_put" + i));
+      byte[] value = Bytes.toBytes("value_" + i);
+      put.addColumn(FAMILY, QUALIFIER, value);
+      multiput.add(put);
+    }
+
+    List<Put> failedPuts = multiplexer.put(TABLE_NAME, multiput);
+    assertNull("All put requests were not accepted by multiplexer queue", 
failedPuts);
+
+    checkRegionLocationIsCached(TABLE_NAME, multiplexer.getConnection());
+    for (int i = 0; i < 10; i++) {
+      checkExistence(TABLE_NAME, Bytes.toBytes("htable_multiplexer_multi_put" 
+ i), FAMILY,
+        QUALIFIER);
+    }
+
+    multiplexer.close();
+  }
+
+  @Test
+  public void testCachingForHTableSinglePut() throws Exception {
+    byte[] row = Bytes.toBytes("htable_single_put");
+    byte[] value = Bytes.toBytes("value");
+
+    Put put = new Put(row);
+    put.addColumn(FAMILY, QUALIFIER, value);
+
+    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
+      table.put(put);
+    }
+
+    checkRegionLocationIsCached(TABLE_NAME, TEST_UTIL.getConnection());
+    checkExistence(TABLE_NAME, row, FAMILY, QUALIFIER);
+  }
+
+  @Test
+  public void testCachingForHTableMultiPut() throws Exception {
+    List<Put> multiput = new ArrayList<Put>();
+    for (int i = 0; i < 10; i++) {
+      Put put = new Put(Bytes.toBytes("htable_multi_put" + i));
+      byte[] value = Bytes.toBytes("value_" + i);
+      put.addColumn(FAMILY, QUALIFIER, value);
+      multiput.add(put);
+    }
+
+    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
+      table.put(multiput);
+    }
+    checkRegionLocationIsCached(TABLE_NAME, TEST_UTIL.getConnection());
+    for (int i = 0; i < 10; i++) {
+      checkExistence(TABLE_NAME, Bytes.toBytes("htable_multi_put" + i), 
FAMILY, QUALIFIER);
+    }
+  }
+
+  /**
+   * Method to check whether the cached region location is non-empty for the 
given table. It repeats
+   * the same check several times as clearing of cache by some async 
operations may not reflect
+   * immediately.
+   */
+  private void checkRegionLocationIsCached(final TableName tableName, final 
Connection conn)
+      throws InterruptedException, IOException {
+    for (int count = 0; count < 50; count++) {
+      int number = ((ConnectionImplementation) 
conn).getNumberOfCachedRegionLocations(tableName);
+      assertNotEquals("Expected non-zero number of cached region locations", 
0, number);
+      Thread.sleep(100);
+    }
+  }
+
+  /**
+   * Method to check whether the passed row exists in the given table
+   */
+  private static void checkExistence(final TableName tableName, final byte[] 
row,
+      final byte[] family, final byte[] qualifier) throws Exception {
+    // verify that the row exists
+    Result r;
+    Get get = new Get(row);
+    get.addColumn(family, qualifier);
+    int nbTry = 0;
+    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+      do {
+        assertTrue("Failed to get row after " + nbTry + " tries", nbTry < 50);
+        nbTry++;
+        Thread.sleep(100);
+        r = table.get(get);
+      } while (r == null || r.getValue(family, qualifier) == null);
+    }
+  }
+}

Reply via email to