This is an automated email from the ASF dual-hosted git repository. tasanuma pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 3b115da8dd91 HDFS-17362. RBF: Implement RouterObserverReadConfiguredFailoverProxyProvider (#6510) 3b115da8dd91 is described below commit 3b115da8dd91989826be3056a385b86612e95af7 Author: Takanobu Asanuma <tasan...@apache.org> AuthorDate: Tue Feb 13 10:49:39 2024 +0900 HDFS-17362. RBF: Implement RouterObserverReadConfiguredFailoverProxyProvider (#6510) Co-authored-by: Chunyi Yang <cy...@lycorp.co.jp> Co-authored-by: Takanobu Asanuma <tasan...@apache.org> Reviewed-by: Simbarashe Dzinamarira <sdzinamar...@linkedin.com> (cherry picked from commit 5cbe52f4e8d84961bdc5a21b77bcb10bf31e335d) --- ...bserverReadConfiguredFailoverProxyProvider.java | 47 ++++++++++++++++++++++ .../ha/RouterObserverReadProxyProvider.java | 2 +- .../federation/router/TestObserverWithRouter.java | 43 ++++++++++++++++---- 3 files changed, 84 insertions(+), 8 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadConfiguredFailoverProxyProvider.java new file mode 100644 index 000000000000..56a913520eda --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadConfiguredFailoverProxyProvider.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import java.net.URI; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; + +/** + * A {@link org.apache.hadoop.io.retry.FailoverProxyProvider} implementation + * to support automatic msync-ing when using routers. + * <p> + * This constructs a wrapper proxy of ConfiguredFailoverProxyProvider, + * and allows to configure logical names for nameservices. + */ +public class RouterObserverReadConfiguredFailoverProxyProvider<T> + extends RouterObserverReadProxyProvider<T> { + + @VisibleForTesting + static final Logger LOG = + LoggerFactory.getLogger(RouterObserverReadConfiguredFailoverProxyProvider.class); + + public RouterObserverReadConfiguredFailoverProxyProvider(Configuration conf, URI uri, + Class<T> xface, HAProxyFactory<T> factory) { + super(conf, uri, xface, factory, + new ConfiguredFailoverProxyProvider<>(conf, uri, xface, factory)); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java index e494e524299b..9707a2a91c5c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java @@ -47,7 +47,7 @@ import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvide */ public class RouterObserverReadProxyProvider<T> extends AbstractNNFailoverProxyProvider<T> { @VisibleForTesting - static final Logger LOG = LoggerFactory.getLogger(ObserverReadProxyProvider.class); + static final Logger LOG = LoggerFactory.getLogger(RouterObserverReadProxyProvider.class); /** Client-side context for syncing with the NameNode server side. */ private final AlignmentContext alignmentContext; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index e20e3ad2a0a6..1419b0cee77f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hdfs.server.federation.router; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -41,6 +44,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.ClientGSIContext; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; @@ -52,6 +56,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeConte import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.ha.RouterObserverReadConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.RouterObserverReadProxyProvider; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; import org.apache.hadoop.test.GenericTestUtils; @@ -72,6 +77,10 @@ public class TestObserverWithRouter { private RouterContext routerContext; private FileSystem fileSystem; + private static final String ROUTER_NS_ID = "router-service"; + private static final String AUTO_MSYNC_PERIOD_KEY_PREFIX = + "dfs.client.failover.observer.auto-msync-period"; + @BeforeEach void init(TestInfo info) throws Exception { if (info.getTags().contains(SKIP_BEFORE_EACH_CLUSTER_STARTUP)) { @@ -146,7 +155,8 @@ public class TestObserverWithRouter { public enum ConfigSetting { USE_NAMENODE_PROXY_FLAG, - USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER + USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER, + USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER } private Configuration getConfToEnableObserverReads(ConfigSetting configSetting) { @@ -162,6 +172,16 @@ public class TestObserverWithRouter { .getRpcServerAddress() .getHostName(), RouterObserverReadProxyProvider.class.getName()); break; + case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER: + // HA configs + conf.set(DFS_NAMESERVICES, ROUTER_NS_ID); + conf.set(DFS_HA_NAMENODES_KEY_PREFIX + "." + ROUTER_NS_ID, "router1"); + conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY+ "." + ROUTER_NS_ID + ".router1", + routerContext.getFileSystemURI().toString()); + DistributedFileSystem.setDefaultUri(conf, "hdfs://" + ROUTER_NS_ID); + conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + ROUTER_NS_ID, + RouterObserverReadConfiguredFailoverProxyProvider.class.getName()); + break; default: Assertions.fail("Unknown config setting: " + configSetting); } @@ -758,8 +778,10 @@ public class TestObserverWithRouter { @ParameterizedTest public void testAutoMsyncEqualsZero(ConfigSetting configSetting) throws Exception { Configuration clientConfiguration = getConfToEnableObserverReads(configSetting); - clientConfiguration.setLong("dfs.client.failover.observer.auto-msync-period." + - routerContext.getRouter().getRpcServerAddress().getHostName(), 0); + String configKeySuffix = + configSetting == ConfigSetting.USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER ? + ROUTER_NS_ID : routerContext.getRouter().getRpcServerAddress().getHostName(); + clientConfiguration.setLong(AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + configKeySuffix, 0); fileSystem = routerContext.getFileSystem(clientConfiguration); List<? extends FederationNamenodeContext> namenodes = routerContext @@ -793,6 +815,7 @@ public class TestObserverWithRouter { assertEquals("Reads sent to observer", numListings - 1, rpcCountForObserver); break; case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER: + case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER: // An msync is sent to each active namenode for each read. // Total msyncs will be (numListings * num_of_nameservices). assertEquals("Msyncs sent to the active namenodes", @@ -809,8 +832,10 @@ public class TestObserverWithRouter { @ParameterizedTest public void testAutoMsyncNonZero(ConfigSetting configSetting) throws Exception { Configuration clientConfiguration = getConfToEnableObserverReads(configSetting); - clientConfiguration.setLong("dfs.client.failover.observer.auto-msync-period." + - routerContext.getRouter().getRpcServerAddress().getHostName(), 3000); + String configKeySuffix = + configSetting == ConfigSetting.USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER ? + ROUTER_NS_ID : routerContext.getRouter().getRpcServerAddress().getHostName(); + clientConfiguration.setLong(AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + configKeySuffix, 3000); fileSystem = routerContext.getFileSystem(clientConfiguration); List<? extends FederationNamenodeContext> namenodes = routerContext @@ -843,6 +868,7 @@ public class TestObserverWithRouter { assertEquals("Reads sent to observer", 2, rpcCountForObserver); break; case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER: + case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER: // 4 msyncs expected. 2 for the first read, and 2 for the third read // after the auto-msync period has elapsed during the sleep. assertEquals("Msyncs sent to the active namenodes", @@ -859,8 +885,10 @@ public class TestObserverWithRouter { @ParameterizedTest public void testThatWriteDoesntBypassNeedForMsync(ConfigSetting configSetting) throws Exception { Configuration clientConfiguration = getConfToEnableObserverReads(configSetting); - clientConfiguration.setLong("dfs.client.failover.observer.auto-msync-period." + - routerContext.getRouter().getRpcServerAddress().getHostName(), 3000); + String configKeySuffix = + configSetting == ConfigSetting.USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER ? + ROUTER_NS_ID : routerContext.getRouter().getRpcServerAddress().getHostName(); + clientConfiguration.setLong(AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + configKeySuffix, 3000); fileSystem = routerContext.getFileSystem(clientConfiguration); List<? extends FederationNamenodeContext> namenodes = routerContext @@ -893,6 +921,7 @@ public class TestObserverWithRouter { assertEquals("Read sent to observer", 1, rpcCountForObserver); break; case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER: + case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER: // 5 calls to the active namenodes expected. 4 msync and a mkdir. // Each of the 2 reads results in an msync to 2 nameservices. // The mkdir also goes to the active. --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org