This is an automated email from the ASF dual-hosted git repository.

taklwu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 2e94f6fb504 HBASE-27527 Port HBASE-27498 to branch-2 (#4919)
2e94f6fb504 is described below

commit 2e94f6fb504737ffdfa2567e6e2a9ea5aa07b8b8
Author: Vaibhav Joshi <vjo...@cloudera.com>
AuthorDate: Tue Dec 13 00:31:09 2022 +0530

    HBASE-27527 Port HBASE-27498 to branch-2 (#4919)
    
    
    Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org>
---
 .../hbase/client/ConnectionImplementation.java     |  82 ++++++++---
 .../hbase/client/TestConnectionImplementation.java | 160 +++++++++++++++++++++
 2 files changed, 221 insertions(+), 21 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 3ed3a06107c..7b9dbc54dec 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -96,6 +96,7 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
 import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
 import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
@@ -179,6 +180,9 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Updat
 @InterfaceAudience.Private
 public class ConnectionImplementation implements ClusterConnection, Closeable {
   public static final String RETRIES_BY_SERVER_KEY = 
"hbase.client.retries.by.server";
+
+  public static final String MASTER_STATE_CACHE_TIMEOUT_SEC =
+    "hbase.client.master.state.cache.timeout.sec";
   private static final Logger LOG = 
LoggerFactory.getLogger(ConnectionImplementation.class);
 
   // The mode tells if HedgedRead, LoadBalance mode is supported.
@@ -261,6 +265,12 @@ public class ConnectionImplementation implements 
ClusterConnection, Closeable {
   /** lock guards against multiple threads trying to query the meta region at 
the same time */
   private final ReentrantLock userRegionLock = new ReentrantLock();
 
+  /**
+   * Supplier to get masterState.By default uses simple supplier without TTL 
cache. When
+   * hbase.client.master.state.cache.timeout.sec > 0 it uses TTL Cache.
+   */
+  private final Supplier<Boolean> masterStateSupplier;
+
   private ChoreService choreService;
 
   /**
@@ -395,6 +405,39 @@ public class ConnectionImplementation implements 
ClusterConnection, Closeable {
       default:
         // Doing nothing
     }
+
+    long masterStateCacheTimeout = 
conf.getLong(MASTER_STATE_CACHE_TIMEOUT_SEC, 0);
+
+    Supplier<Boolean> masterConnSupplier = masterConnectionStateSupplier();
+    if (masterStateCacheTimeout <= 0L) {
+      this.masterStateSupplier = masterConnSupplier;
+    } else {
+      this.masterStateSupplier = 
Suppliers.memoizeWithExpiration(masterConnSupplier::get,
+        masterStateCacheTimeout, TimeUnit.SECONDS);
+    }
+  }
+
+  /**
+   * Visible for tests
+   */
+  Supplier<Boolean> masterConnectionStateSupplier() {
+    return () -> {
+      if (this.masterServiceState.getStub() == null) {
+        return false;
+      }
+      try {
+        LOG.info("Getting master state using rpc call");
+        return this.masterServiceState.isMasterRunning();
+      } catch (UndeclaredThrowableException e) {
+        // It's somehow messy, but we can receive exceptions such as
+        // java.net.ConnectException but they're not declared. So we catch 
it...
+        LOG.info("Master connection is not running anymore", 
e.getUndeclaredThrowable());
+        return false;
+      } catch (IOException se) {
+        LOG.warn("Checking master connection", se);
+        return false;
+      }
+    };
   }
 
   private void spawnRenewalChore(final UserGroupInformation user) {
@@ -1268,7 +1311,6 @@ public class ConnectionImplementation implements 
ClusterConnection, Closeable {
    * Class to make a MasterServiceStubMaker stub.
    */
   private final class MasterServiceStubMaker {
-
     private void isMasterRunning(MasterProtos.MasterService.BlockingInterface 
stub)
       throws IOException {
       try {
@@ -1368,6 +1410,13 @@ public class ConnectionImplementation implements 
ClusterConnection, Closeable {
 
   final MasterServiceState masterServiceState = new MasterServiceState(this);
 
+  /**
+   * Visible for tests
+   */
+  MasterServiceState getMasterServiceState() {
+    return this.masterServiceState;
+  }
+
   @Override
   public MasterKeepAliveConnection getMaster() throws IOException {
     return getKeepAliveMasterService();
@@ -1378,13 +1427,16 @@ public class ConnectionImplementation implements 
ClusterConnection, Closeable {
   }
 
   private MasterKeepAliveConnection getKeepAliveMasterService() throws 
IOException {
-    synchronized (masterLock) {
-      if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
-        MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
-        this.masterServiceState.stub = stubMaker.makeStub();
+    if (!isKeepAliveMasterConnectedAndRunning()) {
+      synchronized (masterLock) {
+        if (!isKeepAliveMasterConnectedAndRunning()) {
+          MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
+          this.masterServiceState.stub = stubMaker.makeStub();
+        }
+        resetMasterServiceState(this.masterServiceState);
       }
-      resetMasterServiceState(this.masterServiceState);
     }
+
     // Ugly delegation just so we can add in a Close method.
     final MasterProtos.MasterService.BlockingInterface stub = 
this.masterServiceState.stub;
     return new MasterKeepAliveConnection() {
@@ -1977,21 +2029,9 @@ public class ConnectionImplementation implements 
ClusterConnection, Closeable {
     }
   }
 
-  private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) 
{
-    if (mss.getStub() == null) {
-      return false;
-    }
-    try {
-      return mss.isMasterRunning();
-    } catch (UndeclaredThrowableException e) {
-      // It's somehow messy, but we can receive exceptions such as
-      // java.net.ConnectException but they're not declared. So we catch it...
-      LOG.info("Master connection is not running anymore", 
e.getUndeclaredThrowable());
-      return false;
-    } catch (IOException se) {
-      LOG.warn("Checking master connection", se);
-      return false;
-    }
+  private boolean isKeepAliveMasterConnectedAndRunning() {
+    LOG.info("Getting master connection state from TTL Cache");
+    return masterStateSupplier.get();
   }
 
   void releaseMaster(MasterServiceState mss) {
diff --git 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java
 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java
new file mode 100644
index 00000000000..75c6a6358f4
--- /dev/null
+++ 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.UndeclaredThrowableException;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@Category({ ClientTests.class, MediumTests.class })
+@RunWith(MockitoJUnitRunner.class)
+public class TestConnectionImplementation {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestConnectionImplementation.class);
+  private static final IntegrationTestingUtility TEST_UTIL = new 
IntegrationTestingUtility();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testGetMaster_noCachedMasterState() throws IOException, 
IllegalAccessException {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 0L);
+    ConnectionImplementation conn =
+      new ConnectionImplementation(conf, null, 
UserProvider.instantiate(conf).getCurrent());
+    ConnectionImplementation.MasterServiceState masterServiceState = 
spyMasterServiceState(conn);
+    conn.getMaster(); // This initializes the stubs but don't call 
isMasterRunning
+    conn.getMaster(); // Calls isMasterRunning since stubs are initialized. 
Invocation 1
+    conn.getMaster(); // Calls isMasterRunning since stubs are initialized. 
Invocation 2
+    Mockito.verify(masterServiceState, Mockito.times(2)).isMasterRunning();
+    conn.close();
+  }
+
+  @Test
+  public void testGetMaster_masterStateCacheHit() throws IOException, 
IllegalAccessException {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 15L);
+    ConnectionImplementation conn =
+      new ConnectionImplementation(conf, null, 
UserProvider.instantiate(conf).getCurrent());
+    ConnectionImplementation.MasterServiceState masterServiceState = 
spyMasterServiceState(conn);
+    conn.getMaster(); // This initializes the stubs but don't call 
isMasterRunning
+    conn.getMaster(); // Uses cached value, don't call isMasterRunning
+    conn.getMaster(); // Uses cached value, don't call isMasterRunning
+    Mockito.verify(masterServiceState, Mockito.times(0)).isMasterRunning();
+    conn.close();
+  }
+
+  @Test
+  public void testGetMaster_masterStateCacheMiss()
+    throws IOException, InterruptedException, IllegalAccessException {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 5L);
+    ConnectionImplementation conn =
+      new ConnectionImplementation(conf, null, 
UserProvider.instantiate(conf).getCurrent());
+    ConnectionImplementation.MasterServiceState masterServiceState = 
spyMasterServiceState(conn);
+    conn.getMaster(); // This initializes the stubs but don't call 
isMasterRunning
+    conn.getMaster(); // Uses cached value, don't call isMasterRunning
+    conn.getMaster(); // Uses cached value, don't call isMasterRunning
+    Thread.sleep(10000);
+    conn.getMaster(); // Calls isMasterRunning after cache expiry. Invocation 1
+    Mockito.verify(masterServiceState, Mockito.times(1)).isMasterRunning();
+    conn.close();
+  }
+
+  @Test
+  public void 
testIsKeepAliveMasterConnectedAndRunning_UndeclaredThrowableException()
+    throws IOException, IllegalAccessException, NoSuchMethodException, 
InvocationTargetException {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 0);
+    ConnectionImplementation conn =
+      new ConnectionImplementation(conf, null, 
UserProvider.instantiate(conf).getCurrent());
+    conn.getMaster(); // Initializes stubs
+
+    ConnectionImplementation.MasterServiceState masterServiceState = 
spyMasterServiceState(conn);
+    Mockito.doThrow(new UndeclaredThrowableException(new Exception("DUMMY 
EXCEPTION")))
+      .when(masterServiceState).isMasterRunning();
+
+    // Verify that masterState is "false" because of to injected exception
+    boolean isKeepAliveMasterRunning =
+      (boolean) getIsKeepAliveMasterConnectedAndRunningMethod().invoke(conn);
+    Assert.assertFalse(isKeepAliveMasterRunning);
+    conn.close();
+  }
+
+  @Test
+  public void testIsKeepAliveMasterConnectedAndRunning_IOException()
+    throws IOException, IllegalAccessException, NoSuchMethodException, 
InvocationTargetException {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 0);
+    ConnectionImplementation conn =
+      new ConnectionImplementation(conf, null, 
UserProvider.instantiate(conf).getCurrent());
+    conn.getMaster();
+
+    ConnectionImplementation.MasterServiceState masterServiceState = 
spyMasterServiceState(conn);
+    Mockito.doThrow(new IOException("DUMMY 
EXCEPTION")).when(masterServiceState).isMasterRunning();
+
+    boolean isKeepAliveMasterRunning =
+      (boolean) getIsKeepAliveMasterConnectedAndRunningMethod().invoke(conn);
+
+    // Verify that masterState is "false" because of to injected exception
+    Assert.assertFalse(isKeepAliveMasterRunning);
+    conn.close();
+  }
+
+  // Spy the masterServiceState object using reflection
+  private ConnectionImplementation.MasterServiceState
+    spyMasterServiceState(ConnectionImplementation conn) throws 
IllegalAccessException {
+    ConnectionImplementation.MasterServiceState spiedMasterServiceState =
+      Mockito.spy(conn.getMasterServiceState());
+    FieldUtils.writeDeclaredField(conn, "masterServiceState", 
spiedMasterServiceState, true);
+    return spiedMasterServiceState;
+  }
+
+  // Get isKeepAliveMasterConnectedAndRunning using reflection
+  private Method getIsKeepAliveMasterConnectedAndRunningMethod() throws 
NoSuchMethodException {
+    Method method =
+      
ConnectionImplementation.class.getDeclaredMethod("isKeepAliveMasterConnectedAndRunning");
+    method.setAccessible(true);
+    return method;
+  }
+}

Reply via email to