This is an automated email from the ASF dual-hosted git repository. inigoiri pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 7bedb96601a HDFS-17030 Limit wait time for getHAServiceState in ObserverReadProxyProvider (#5878) 7bedb96601a is described below commit 7bedb96601a25244926f87ec14e36d1bdeec5f28 Author: Xing Lin <linxing...@gmail.com> AuthorDate: Thu Aug 10 17:35:58 2023 -0700 HDFS-17030 Limit wait time for getHAServiceState in ObserverReadProxyProvider (#5878) --- .../namenode/ha/ObserverReadProxyProvider.java | 90 ++++++++- .../namenode/ha/TestObserverReadProxyProvider.java | 215 ++++++++++++++++++++- 2 files changed, 296 insertions(+), 9 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java index 9cabeb9037f..f741f38b4d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java @@ -24,7 +24,12 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.URI; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; @@ -46,6 +51,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RpcInvocationHandler; import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.util.BlockingThreadPoolExecutorService; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +94,17 @@ public class ObserverReadProxyProvider<T> /** Observer probe retry period default to 10 min. */ static final long OBSERVER_PROBE_RETRY_PERIOD_DEFAULT = 60 * 10 * 1000; + /** + * Timeout in ms to cancel the ha-state probe rpc request for an namenode. + * To disable timeout, set it to 0 or a negative value. + */ + static final String NAMENODE_HA_STATE_PROBE_TIMEOUT = + HdfsClientConfigKeys.Failover.PREFIX + "namenode.ha-state.probe.timeout"; + /** + * Default to disable namenode ha-state probe timeout. + */ + static final long NAMENODE_HA_STATE_PROBE_TIMEOUT_DEFAULT = 0; + /** The inner proxy provider used for active/standby failover. */ private final AbstractNNFailoverProxyProvider<T> failoverProxy; /** List of all NameNode proxies. */ @@ -155,12 +172,22 @@ public class ObserverReadProxyProvider<T> */ private long observerProbeRetryPeriodMs; + /** + * Timeout in ms when we try to get the HA state of a namenode. + */ + private long namenodeHAStateProbeTimeoutMs; + /** * The previous time where zero observer were found. If there was observer, * or it is initialization, this is set to 0. */ private long lastObserverProbeTime; + /** + * Threadpool to send the getHAServiceState requests. + */ + private final BlockingThreadPoolExecutorService nnProbingThreadPool; + /** * By default ObserverReadProxyProvider uses * {@link ConfiguredFailoverProxyProvider} for failover. @@ -213,6 +240,8 @@ public class ObserverReadProxyProvider<T> observerProbeRetryPeriodMs = conf.getTimeDuration( OBSERVER_PROBE_RETRY_PERIOD_KEY, OBSERVER_PROBE_RETRY_PERIOD_DEFAULT, TimeUnit.MILLISECONDS); + namenodeHAStateProbeTimeoutMs = conf.getTimeDuration(NAMENODE_HA_STATE_PROBE_TIMEOUT, + NAMENODE_HA_STATE_PROBE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); // TODO : make this configurable or remove this variable if (wrappedProxy instanceof ClientProtocol) { @@ -222,6 +251,15 @@ public class ObserverReadProxyProvider<T> + "class does not implement {}", uri, ClientProtocol.class.getName()); this.observerReadEnabled = false; } + + /* + * At most 4 threads will be running and each thread will die after 10 + * seconds of no use. Up to 132 tasks (4 active + 128 waiting) can be + * submitted simultaneously. + */ + nnProbingThreadPool = + BlockingThreadPoolExecutorService.newInstance(4, 128, 10L, TimeUnit.SECONDS, + "nn-ha-state-probing"); } public AlignmentContext getAlignmentContext() { @@ -285,13 +323,62 @@ public class ObserverReadProxyProvider<T> } currentIndex = (currentIndex + 1) % nameNodeProxies.size(); currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex)); - currentProxy.setCachedState(getHAServiceState(currentProxy)); + currentProxy.setCachedState(getHAServiceStateWithTimeout(currentProxy)); LOG.debug("Changed current proxy from {} to {}", initial == null ? "none" : initial.proxyInfo, currentProxy.proxyInfo); return currentProxy; } + /** + * Execute getHAServiceState() call with a timeout, to avoid a long wait when + * an NN becomes irresponsive to rpc requests + * (when a thread/heap dump is being taken, e.g.). + * + * For each getHAServiceState() call, a task is created and submitted to a + * threadpool for execution. We will wait for a response up to + * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out. + * + * The implementation is split into two functions so that we can unit test + * the second function. + */ + HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo<T> proxyInfo) { + try { + Future<HAServiceState> task = nnProbingThreadPool.submit(() -> getHAServiceState(proxyInfo)); + return getHAServiceStateWithTimeout(proxyInfo, task); + } catch (RejectedExecutionException e) { + LOG.warn("Run out of threads to submit the request to query HA state. " + + "Ok to return null and we will fallback to use active NN to serve " + + "this request."); + return null; + } + } + + HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo<T> proxyInfo, + Future<HAServiceState> task) { + HAServiceState state = null; + try { + if (namenodeHAStateProbeTimeoutMs > 0) { + state = task.get(namenodeHAStateProbeTimeoutMs, TimeUnit.MILLISECONDS); + } else { + // Disable timeout by waiting indefinitely when namenodeHAStateProbeTimeoutSec is set to 0 + // or a negative value. + state = task.get(); + } + LOG.debug("HA State for {} is {}", proxyInfo.proxyInfo, state); + } catch (TimeoutException e) { + // Cancel the task on timeout + String msg = String.format("Cancel NN probe task due to timeout for %s", proxyInfo.proxyInfo); + LOG.warn(msg, e); + task.cancel(true); + } catch (InterruptedException|ExecutionException e) { + String msg = String.format("Exception in NN probe task for %s", proxyInfo.proxyInfo); + LOG.warn(msg, e); + } + + return state; + } + /** * Fetch the service state from a proxy. If it is unable to be fetched, * assume it is in standby state, but log the exception. @@ -554,6 +641,7 @@ public class ObserverReadProxyProvider<T> } } failoverProxy.close(); + nnProbingThreadPool.shutdown(); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java index 9cd6c6a1061..69e49cecf2b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; import java.io.IOException; import java.net.InetSocketAddress; @@ -24,7 +26,10 @@ import java.net.URI; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsAction; @@ -36,20 +41,29 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.tools.GetUserMappingsProtocol; +import org.apache.hadoop.util.StopWatch; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.slf4j.event.Level; import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.*; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; /** @@ -58,29 +72,42 @@ import static org.mockito.Mockito.when; * NameNode to communicate with. */ public class TestObserverReadProxyProvider { + private final static long SLOW_RESPONSE_SLEEP_TIME = TimeUnit.SECONDS.toMillis(5); // 5 s + private final static long NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT = TimeUnit.SECONDS.toMillis(2); + private final static long NAMENODE_HA_STATE_PROBE_TIMEOUT_LONG = TimeUnit.SECONDS.toMillis(25); + private final GenericTestUtils.LogCapturer proxyLog = + GenericTestUtils.LogCapturer.captureLogs(ObserverReadProxyProvider.LOG); private static final LocatedBlock[] EMPTY_BLOCKS = new LocatedBlock[0]; private String ns; private URI nnURI; - private Configuration conf; private ObserverReadProxyProvider<ClientProtocol> proxyProvider; private NameNodeAnswer[] namenodeAnswers; private String[] namenodeAddrs; + @BeforeClass + public static void setLogLevel() { + GenericTestUtils.setLogLevel(ObserverReadProxyProvider.LOG, Level.DEBUG); + } + @Before public void setup() throws Exception { ns = "testcluster"; nnURI = URI.create("hdfs://" + ns); - conf = new Configuration(); - conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns); - // Set observer probe retry period to 0. Required by the tests that - // transition observer back and forth - conf.setTimeDuration( - OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS); } private void setupProxyProvider(int namenodeCount) throws Exception { + setupProxyProvider(namenodeCount, new Configuration()); + } + + private void setupProxyProvider(int namenodeCount, long nnHAStateProbeTimeout) throws Exception { + Configuration conf = new Configuration(); + conf.setLong(NAMENODE_HA_STATE_PROBE_TIMEOUT, nnHAStateProbeTimeout); + setupProxyProvider(namenodeCount, conf); + } + + private void setupProxyProvider(int namenodeCount, Configuration conf) throws Exception { String[] namenodeIDs = new String[namenodeCount]; namenodeAddrs = new String[namenodeCount]; namenodeAnswers = new NameNodeAnswer[namenodeCount]; @@ -103,6 +130,12 @@ public class TestObserverReadProxyProvider { } conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, Joiner.on(",").join(namenodeIDs)); + conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns); + // Set observer probe retry period to 0. Required by the tests that + // transition observer back and forth + conf.setTimeDuration( + OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS); + conf.setBoolean(HdfsClientConfigKeys.Failover.RANDOM_ORDER, false); proxyProvider = new ObserverReadProxyProvider<ClientProtocol>(conf, nnURI, ClientProtocol.class, new ClientHAProxyFactory<ClientProtocol>() { @@ -144,7 +177,7 @@ public class TestObserverReadProxyProvider { } }; ObserverReadProxyProvider<GetUserMappingsProtocol> userProxyProvider = - new ObserverReadProxyProvider<>(conf, nnURI, + new ObserverReadProxyProvider<>(proxyProvider.conf, nnURI, GetUserMappingsProtocol.class, proxyFactory); assertArrayEquals(fakeGroups, userProxyProvider.getProxy().proxy.getGroupsForUser(fakeUser)); @@ -324,6 +357,160 @@ public class TestObserverReadProxyProvider { assertHandledBy(1); } + /** + * Happy case for GetHAServiceStateWithTimeout. + */ + @Test + public void testGetHAServiceStateWithTimeout() throws Exception { + proxyLog.clearOutput(); + + setupProxyProvider(1, NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT); + final HAServiceState state = HAServiceState.STANDBY; + @SuppressWarnings("unchecked") NNProxyInfo<ClientProtocol> dummyNNProxyInfo = + (NNProxyInfo<ClientProtocol>) mock(NNProxyInfo.class); + @SuppressWarnings("unchecked") Future<HAServiceState> task = mock(Future.class); + when(task.get(anyLong(), any(TimeUnit.class))).thenReturn(state); + + HAServiceState state2 = proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task); + assertEquals(state, state2); + verify(task).get(anyLong(), any(TimeUnit.class)); + verifyNoMoreInteractions(task); + assertEquals(1, StringUtils.countMatches(proxyLog.getOutput(), + "HA State for " + dummyNNProxyInfo.proxyInfo + " is " + state)); + proxyLog.clearOutput(); + } + + /** + * Test TimeoutException for GetHAServiceStateWithTimeout. + */ + @Test + public void testTimeoutExceptionGetHAServiceStateWithTimeout() throws Exception { + proxyLog.clearOutput(); + + setupProxyProvider(1, NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT); + @SuppressWarnings("unchecked") NNProxyInfo<ClientProtocol> dummyNNProxyInfo = + (NNProxyInfo<ClientProtocol>) Mockito.mock(NNProxyInfo.class); + @SuppressWarnings("unchecked") Future<HAServiceState> task = mock(Future.class); + TimeoutException e = new TimeoutException("Timeout"); + when(task.get(anyLong(), any(TimeUnit.class))).thenThrow(e); + + HAServiceState state = proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task); + assertNull(state); + verify(task).get(anyLong(), any(TimeUnit.class)); + verify(task).cancel(true); + verifyNoMoreInteractions(task); + assertEquals(1, StringUtils.countMatches(proxyLog.getOutput(), + "Cancel NN probe task due to timeout for " + dummyNNProxyInfo.proxyInfo)); + proxyLog.clearOutput(); + } + + /** + * Test InterruptedException for GetHAServiceStateWithTimeout. + */ + @Test + public void testInterruptedExceptionGetHAServiceStateWithTimeout() throws Exception { + proxyLog.clearOutput(); + + setupProxyProvider(1, NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT); + @SuppressWarnings("unchecked") NNProxyInfo<ClientProtocol> dummyNNProxyInfo = + (NNProxyInfo<ClientProtocol>) Mockito.mock(NNProxyInfo.class); + @SuppressWarnings("unchecked") Future<HAServiceState> task = mock(Future.class); + InterruptedException e = new InterruptedException("Interrupted"); + when(task.get(anyLong(), any(TimeUnit.class))).thenThrow(e); + + HAServiceState state = proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task); + assertNull(state); + verify(task).get(anyLong(), any(TimeUnit.class)); + verifyNoMoreInteractions(task); + assertEquals(1, StringUtils.countMatches(proxyLog.getOutput(), + "Exception in NN probe task for " + dummyNNProxyInfo.proxyInfo)); + proxyLog.clearOutput(); + } + + /** + * Test ExecutionException for GetHAServiceStateWithTimeout. + */ + @Test + public void testExecutionExceptionGetHAServiceStateWithTimeout() throws Exception { + proxyLog.clearOutput(); + + setupProxyProvider(1, NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT); + @SuppressWarnings("unchecked") NNProxyInfo<ClientProtocol> dummyNNProxyInfo = + (NNProxyInfo<ClientProtocol>) Mockito.mock(NNProxyInfo.class); + @SuppressWarnings("unchecked") Future<HAServiceState> task = mock(Future.class); + Exception e = new ExecutionException(new InterruptedException("Interrupted")); + when(task.get(anyLong(), any(TimeUnit.class))).thenThrow(e); + + HAServiceState state = proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task); + assertNull(state); + verify(task).get(anyLong(), any(TimeUnit.class)); + verifyNoMoreInteractions(task); + assertEquals(1, StringUtils.countMatches(proxyLog.getOutput(), + "Exception in NN probe task for " + dummyNNProxyInfo.proxyInfo)); + proxyLog.clearOutput(); + } + + /** + * Test GetHAServiceState when timeout is disabled (test the else { task.get() } code path). + */ + @Test + public void testGetHAServiceStateWithoutTimeout() throws Exception { + proxyLog.clearOutput(); + setupProxyProvider(1, 0); + + final HAServiceState state = HAServiceState.STANDBY; + @SuppressWarnings("unchecked") NNProxyInfo<ClientProtocol> dummyNNProxyInfo = + (NNProxyInfo<ClientProtocol>) mock(NNProxyInfo.class); + @SuppressWarnings("unchecked") Future<HAServiceState> task = mock(Future.class); + when(task.get()).thenReturn(state); + + HAServiceState state2 = proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task); + assertEquals(state, state2); + verify(task).get(); + verifyNoMoreInteractions(task); + assertEquals(1, StringUtils.countMatches(proxyLog.getOutput(), + "HA State for " + dummyNNProxyInfo.proxyInfo + " is " + state)); + proxyLog.clearOutput(); + } + + /** + * Test getHAServiceState when we have a slow NN, using a 25s timeout. + * This is to verify the old behavior without being able to fast-fail (we can also set + * namenodeHAStateProbeTimeoutMs to 0 or a negative value and the rest of the test can stay + * the same). + * + * 5-second (SLOW_RESPONSE_SLEEP_TIME) latency is introduced and we expect that latency is added + * to the READ operation. + */ + @Test + public void testStandbyGetHAServiceStateLongTimeout() throws Exception { + setupProxyProvider(4, NAMENODE_HA_STATE_PROBE_TIMEOUT_LONG); + namenodeAnswers[0].setActiveState(); + namenodeAnswers[1].setSlowNode(true); + namenodeAnswers[3].setObserverState(); + + StopWatch watch = new StopWatch(); + watch.start(); + doRead(); + long runtime = watch.now(TimeUnit.MILLISECONDS); + assertTrue("Read operation finished earlier than we expected", + runtime > SLOW_RESPONSE_SLEEP_TIME); + } + + /** + * Test getHAServiceState using a 2s timeout with a slow standby. + * Fail the test if we don't complete it in 4s. + */ + @Test(timeout = 4000) + public void testStandbyGetHAServiceStateTimeout() throws Exception { + setupProxyProvider(4, NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT); + namenodeAnswers[0].setActiveState(); + namenodeAnswers[1].setSlowNode(true); + namenodeAnswers[3].setObserverState(); + + doRead(); + } + private void doRead() throws Exception { doRead(proxyProvider.getProxy().proxy); } @@ -356,6 +543,7 @@ public class TestObserverReadProxyProvider { private volatile boolean unreachable = false; private volatile boolean retryActive = false; + private volatile boolean slowNode = false; // Standby state by default private volatile boolean allowWrites = false; @@ -369,6 +557,12 @@ public class TestObserverReadProxyProvider { if (unreachable) { throw new IOException("Unavailable"); } + + // sleep to simulate slow rpc responses. + if (slowNode) { + Thread.sleep(SLOW_RESPONSE_SLEEP_TIME); + } + // retryActive should be checked before getHAServiceState. // Check getHAServiceState first here only because in test, // it relies read call, which relies on getHAServiceState @@ -415,6 +609,11 @@ public class TestObserverReadProxyProvider { this.unreachable = unreachable; } + // Whether this node should be slow in rpc response. + void setSlowNode(boolean slowNode) { + this.slowNode = slowNode; + } + void setActiveState() { allowReads = true; allowWrites = true; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org