[
https://issues.apache.org/jira/browse/HDFS-14750?focusedWorklogId=767107&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-767107
]
ASF GitHub Bot logged work on HDFS-14750:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 06/May/22 09:49
Start Date: 06/May/22 09:49
Worklog Time Spent: 10m
Work Description: ferhui commented on code in PR #4199:
URL: https://github.com/apache/hadoop/pull/4199#discussion_r866622590
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/DynamicRouterRpcFairnessPolicyController.java:
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.utils.AdjustableSemaphore;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+
+import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_DEFAULT;
+import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_KEY;
+import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
+import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
+
+/**
+ * Dynamic fairness policy extending {@link
StaticRouterRpcFairnessPolicyController}
+ * and fetching handlers from configuration for all available name services.
+ * The handlers count changes according to traffic to namespaces.
+ * Total handlers might NOT strictly add up to the value defined by
DFS_ROUTER_HANDLER_COUNT_KEY.
+ */
+public class DynamicRouterRpcFairnessPolicyController
+ extends StaticRouterRpcFairnessPolicyController {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DynamicRouterRpcFairnessPolicyController.class);
+
+ private static final ScheduledExecutorService scheduledExecutor =
HadoopExecutors
+ .newSingleThreadScheduledExecutor(new
ThreadFactoryBuilder().setDaemon(true)
+
.setNameFormat("DynamicRouterRpcFairnessPolicyControllerPermitsResizer").build());
+ private PermitsResizerService permitsResizerService;
+ private ScheduledFuture<?> refreshTask;
+ private int handlerCount;
+
+ /**
+ * Initializes using the same logic as {@link
StaticRouterRpcFairnessPolicyController}
+ * and starts a periodic semaphore resizer thread
+ *
+ * @param conf configuration
+ */
+ public DynamicRouterRpcFairnessPolicyController(Configuration conf) {
+ super(conf);
+ handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY,
DFS_ROUTER_HANDLER_COUNT_DEFAULT);
+ long refreshInterval =
+
conf.getTimeDuration(DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_KEY,
+ DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
+ permitsResizerService = new PermitsResizerService();
+ refreshTask = scheduledExecutor
+ .scheduleWithFixedDelay(permitsResizerService, refreshInterval,
refreshInterval,
+ TimeUnit.MILLISECONDS);
+ }
+
+ @VisibleForTesting
+ public DynamicRouterRpcFairnessPolicyController(Configuration conf, long
refreshInterval) {
+ super(conf);
+ handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY,
DFS_ROUTER_HANDLER_COUNT_DEFAULT);
+ permitsResizerService = new PermitsResizerService();
+ refreshTask = scheduledExecutor
+ .scheduleWithFixedDelay(permitsResizerService, refreshInterval,
refreshInterval,
+ TimeUnit.MILLISECONDS);
+ }
+
+ @VisibleForTesting
+ public void refreshPermitsCap() {
+ permitsResizerService.run();
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ if (refreshTask != null) {
+ refreshTask.cancel(true);
+ }
+ if (scheduledExecutor != null) {
+ scheduledExecutor.shutdown();
+ }
+ }
+
+ class PermitsResizerService implements Runnable {
+
+ @Override
+ public synchronized void run() {
+ long totalOps = 0;
+ Map<String, Long> nsOps = new HashMap<>();
+ for (Map.Entry<String, AdjustableSemaphore> entry : permits.entrySet()) {
+ long ops = (rejectedPermitsPerNs.containsKey(entry.getKey()) ?
+ rejectedPermitsPerNs.get(entry.getKey()).longValue() :
+ 0) + (acceptedPermitsPerNs.containsKey(entry.getKey()) ?
+ acceptedPermitsPerNs.get(entry.getKey()).longValue() :
+ 0);
+ nsOps.put(entry.getKey(), ops);
+ totalOps += ops;
+ }
+
+ for (Map.Entry<String, AdjustableSemaphore> entry : permits.entrySet()) {
+ String ns = entry.getKey();
+ AdjustableSemaphore semaphore = entry.getValue();
+ int oldPermitCap = permitSizes.get(ns);
+ int newPermitCap = (int) Math.ceil((float) nsOps.get(ns) / totalOps *
handlerCount);
+ // Leave at least 1 handler even if there's no traffic
+ if (newPermitCap == 0) {
+ newPermitCap = 1;
Review Comment:
How about adding a new configuration for this.
e.g. xxx.min.handler.count
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/DynamicRouterRpcFairnessPolicyController.java:
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.utils.AdjustableSemaphore;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+
+import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_DEFAULT;
+import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_KEY;
+import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
+import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
+
+/**
+ * Dynamic fairness policy extending {@link
StaticRouterRpcFairnessPolicyController}
+ * and fetching handlers from configuration for all available name services.
+ * The handlers count changes according to traffic to namespaces.
+ * Total handlers might NOT strictly add up to the value defined by
DFS_ROUTER_HANDLER_COUNT_KEY.
+ */
+public class DynamicRouterRpcFairnessPolicyController
+ extends StaticRouterRpcFairnessPolicyController {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DynamicRouterRpcFairnessPolicyController.class);
+
+ private static final ScheduledExecutorService scheduledExecutor =
HadoopExecutors
+ .newSingleThreadScheduledExecutor(new
ThreadFactoryBuilder().setDaemon(true)
+
.setNameFormat("DynamicRouterRpcFairnessPolicyControllerPermitsResizer").build());
+ private PermitsResizerService permitsResizerService;
+ private ScheduledFuture<?> refreshTask;
+ private int handlerCount;
+
+ /**
+ * Initializes using the same logic as {@link
StaticRouterRpcFairnessPolicyController}
+ * and starts a periodic semaphore resizer thread
+ *
+ * @param conf configuration
+ */
+ public DynamicRouterRpcFairnessPolicyController(Configuration conf) {
+ super(conf);
+ handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY,
DFS_ROUTER_HANDLER_COUNT_DEFAULT);
+ long refreshInterval =
+
conf.getTimeDuration(DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_KEY,
+ DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
+ permitsResizerService = new PermitsResizerService();
+ refreshTask = scheduledExecutor
+ .scheduleWithFixedDelay(permitsResizerService, refreshInterval,
refreshInterval,
+ TimeUnit.MILLISECONDS);
+ }
+
+ @VisibleForTesting
+ public DynamicRouterRpcFairnessPolicyController(Configuration conf, long
refreshInterval) {
+ super(conf);
+ handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY,
DFS_ROUTER_HANDLER_COUNT_DEFAULT);
+ permitsResizerService = new PermitsResizerService();
+ refreshTask = scheduledExecutor
+ .scheduleWithFixedDelay(permitsResizerService, refreshInterval,
refreshInterval,
+ TimeUnit.MILLISECONDS);
+ }
+
+ @VisibleForTesting
+ public void refreshPermitsCap() {
+ permitsResizerService.run();
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ if (refreshTask != null) {
+ refreshTask.cancel(true);
+ }
+ if (scheduledExecutor != null) {
+ scheduledExecutor.shutdown();
+ }
+ }
+
+ class PermitsResizerService implements Runnable {
+
+ @Override
+ public synchronized void run() {
+ long totalOps = 0;
+ Map<String, Long> nsOps = new HashMap<>();
+ for (Map.Entry<String, AdjustableSemaphore> entry : permits.entrySet()) {
+ long ops = (rejectedPermitsPerNs.containsKey(entry.getKey()) ?
+ rejectedPermitsPerNs.get(entry.getKey()).longValue() :
+ 0) + (acceptedPermitsPerNs.containsKey(entry.getKey()) ?
+ acceptedPermitsPerNs.get(entry.getKey()).longValue() :
+ 0);
+ nsOps.put(entry.getKey(), ops);
+ totalOps += ops;
+ }
+
+ for (Map.Entry<String, AdjustableSemaphore> entry : permits.entrySet()) {
Review Comment:
It seems that maybe handlerCount does not equal to total newPermitCap, right?
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestDynamicRouterRpcFairnessPolicyController.java:
##########
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+
+import static
org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
+import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
+import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
+
+/**
+ * Test functionality of {@link DynamicRouterRpcFairnessPolicyController).
+ */
+public class TestDynamicRouterRpcFairnessPolicyController {
+
+ private static String nameServices = "ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2";
+
+ @Test
+ public void testDynamicControllerSimple() throws InterruptedException {
+ verifyDynamicControllerSimple(true);
+ verifyDynamicControllerSimple(false);
+ }
+
+ @Test
+ public void testDynamicControllerAllPermitsAcquired() throws
InterruptedException {
+ verifyDynamicControllerAllPermitsAcquired(true);
+ verifyDynamicControllerAllPermitsAcquired(false);
+ }
+
+ private void verifyDynamicControllerSimple(boolean manualRefresh)
+ throws InterruptedException {
+ // 3 permits each ns
+ DynamicRouterRpcFairnessPolicyController controller;
+ if (manualRefresh) {
+ controller = getFairnessPolicyController(9);
+ } else {
+ controller = getFairnessPolicyController(9, 4000);
+ }
+ for (int i = 0; i < 3; i++) {
+ Assert.assertTrue(controller.acquirePermit("ns1"));
+ Assert.assertTrue(controller.acquirePermit("ns2"));
+ Assert.assertTrue(controller.acquirePermit(CONCURRENT_NS));
+ }
+ Assert.assertFalse(controller.acquirePermit("ns1"));
+ Assert.assertFalse(controller.acquirePermit("ns2"));
+ Assert.assertFalse(controller.acquirePermit(CONCURRENT_NS));
+
+ // Release all permits
+ for (int i = 0; i < 3; i++) {
+ controller.releasePermit("ns1");
+ controller.releasePermit("ns2");
+ controller.releasePermit(CONCURRENT_NS);
+ }
+
+ // Inject dummy metrics
+ // Split half half for ns1 and concurrent
+ Map<String, LongAdder> rejectedPermitsPerNs = new HashMap<>();
+ Map<String, LongAdder> acceptedPermitsPerNs = new HashMap<>();
+ injectDummyMetrics(rejectedPermitsPerNs, "ns1", 10);
+ injectDummyMetrics(rejectedPermitsPerNs, "ns2", 0);
+ injectDummyMetrics(rejectedPermitsPerNs, CONCURRENT_NS, 10);
+ controller.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs);
+
+ if (manualRefresh) {
+ controller.refreshPermitsCap();
+ } else {
+ Thread.sleep(5000);
Review Comment:
Better to use GenericTestUtils.waitFor instead of sleep.
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestDynamicRouterRpcFairnessPolicyController.java:
##########
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+
+import static
org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
+import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
+import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
+
+/**
+ * Test functionality of {@link DynamicRouterRpcFairnessPolicyController).
+ */
+public class TestDynamicRouterRpcFairnessPolicyController {
+
+ private static String nameServices = "ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2";
+
+ @Test
+ public void testDynamicControllerSimple() throws InterruptedException {
+ verifyDynamicControllerSimple(true);
+ verifyDynamicControllerSimple(false);
+ }
+
+ @Test
+ public void testDynamicControllerAllPermitsAcquired() throws
InterruptedException {
+ verifyDynamicControllerAllPermitsAcquired(true);
+ verifyDynamicControllerAllPermitsAcquired(false);
+ }
+
+ private void verifyDynamicControllerSimple(boolean manualRefresh)
+ throws InterruptedException {
+ // 3 permits each ns
+ DynamicRouterRpcFairnessPolicyController controller;
+ if (manualRefresh) {
+ controller = getFairnessPolicyController(9);
+ } else {
+ controller = getFairnessPolicyController(9, 4000);
+ }
+ for (int i = 0; i < 3; i++) {
+ Assert.assertTrue(controller.acquirePermit("ns1"));
+ Assert.assertTrue(controller.acquirePermit("ns2"));
+ Assert.assertTrue(controller.acquirePermit(CONCURRENT_NS));
+ }
+ Assert.assertFalse(controller.acquirePermit("ns1"));
+ Assert.assertFalse(controller.acquirePermit("ns2"));
+ Assert.assertFalse(controller.acquirePermit(CONCURRENT_NS));
+
+ // Release all permits
+ for (int i = 0; i < 3; i++) {
+ controller.releasePermit("ns1");
+ controller.releasePermit("ns2");
+ controller.releasePermit(CONCURRENT_NS);
+ }
+
+ // Inject dummy metrics
+ // Split half half for ns1 and concurrent
+ Map<String, LongAdder> rejectedPermitsPerNs = new HashMap<>();
+ Map<String, LongAdder> acceptedPermitsPerNs = new HashMap<>();
+ injectDummyMetrics(rejectedPermitsPerNs, "ns1", 10);
+ injectDummyMetrics(rejectedPermitsPerNs, "ns2", 0);
+ injectDummyMetrics(rejectedPermitsPerNs, CONCURRENT_NS, 10);
+ controller.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs);
+
+ if (manualRefresh) {
+ controller.refreshPermitsCap();
+ } else {
+ Thread.sleep(5000);
+ }
+
+ // Current permits count should be 5:1:5
Review Comment:
Total old permits don't equal to total new permits
Issue Time Tracking
-------------------
Worklog Id: (was: 767107)
Time Spent: 0.5h (was: 20m)
> RBF: Improved isolation for downstream name nodes. {Dynamic}
> ------------------------------------------------------------
>
> Key: HDFS-14750
> URL: https://issues.apache.org/jira/browse/HDFS-14750
> Project: Hadoop HDFS
> Issue Type: Improvement
> Reporter: CR Hota
> Assignee: CR Hota
> Priority: Major
> Labels: pull-request-available
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> This Jira tracks the work around dynamic allocation of resources in routers
> for downstream hdfs clusters.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]