[
https://issues.apache.org/jira/browse/HADOOP-19140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878722#comment-17878722
]
ASF GitHub Bot commented on HADOOP-19140:
-----------------------------------------
anujmodi2021 commented on code in PR #6703:
URL: https://github.com/apache/hadoop/pull/6703#discussion_r1741499620
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestIORateLimiter.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.time.Duration;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.impl.IORateLimiterSupport;
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+import org.apache.hadoop.util.RateLimiting;
+import org.apache.hadoop.util.RateLimitingFactory;
+
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE;
+import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE_BULK;
+import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE_DIR;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test IO rate limiting in {@link RateLimiting} and {@link IORateLimiter}.
+ * <p>
+ * This includes: illegal arguments, and what if more capacity
+ * is requested than is available.
+ */
+public class TestIORateLimiter extends AbstractHadoopTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestIORateLimiter.class);
+
+ public static final Path ROOT = new Path("/");
+
+ @Test
+ public void testAcquireCapacity() {
+ final int size = 10;
+ final RateLimiting limiter = RateLimitingFactory.create(size);
+ // do a chain of requests
+ limiter.acquire(0);
+ limiter.acquire(1);
+ limiter.acquire(2);
+
+ // now ask for more than is allowed. This MUST work.
+ final int excess = size * 2;
+ limiter.acquire(excess);
+ assertDelayed(limiter, excess);
+ }
+
+ @Test
+ public void testNegativeCapacityRejected() throws Throwable {
+ final RateLimiting limiter = RateLimitingFactory.create(1);
+ intercept(IllegalArgumentException.class, () ->
+ limiter.acquire(-1));
+ }
+
+ @Test
+ public void testNegativeLimiterCapacityRejected() throws Throwable {
+ intercept(IllegalArgumentException.class, () ->
+ RateLimitingFactory.create(-1));
+ }
+
+ /**
+ * This is a key behavior: it is acceptable to ask for more capacity
+ * than the caller has, the initial request must be granted,
+ * but the followup request must be delayed until enough capacity
+ * has been restored.
+ */
+ @Test
+ public void testAcquireExcessCapacity() {
+
+ // create a small limiter
+ final int size = 10;
+ final RateLimiting limiter = RateLimitingFactory.create(size);
+
+ // now ask for more than is allowed. This MUST work.
+ final int excess = size * 2;
+ // first attempt gets more capacity than arrives every second.
+ assertNotDelayed(limiter, excess);
+ // second attempt will block
+ assertDelayed(limiter, excess);
+ // third attempt will block
+ assertDelayed(limiter, size);
+ // as these are short-cut, no delays.
+ assertNotDelayed(limiter, 0);
+ }
+
+ @Test
+ public void testIORateLimiterWithLimitedCapacity() {
+ final int size = 10;
+ final IORateLimiter limiter =
IORateLimiterSupport.createIORateLimiter(size);
+ // this size will use more than can be allocated in a second.
+ final int excess = size * 2;
+ // first attempt gets more capacity than arrives every second.
+ assertNotDelayed(limiter, OP_DELETE_DIR, excess);
+ // second attempt will block
+ assertDelayed(limiter, OP_DELETE_BULK, excess);
+ // third attempt will block
+ assertDelayed(limiter, OP_DELETE, size);
+ // as zero capacity requests are short-cut, no delays, ever.
+ assertNotDelayed(limiter, "", 0);
+ }
+
+ /**
+ * Verify the unlimited rate limiter really is unlimited.
+ */
+ @Test
+ public void testIORateLimiterWithUnlimitedCapacity() {
+ final IORateLimiter limiter = IORateLimiterSupport.unlimited();
+ // this size will use more than can be allocated in a second.
+
+ assertNotDelayed(limiter, "1", 100_000);
+ assertNotDelayed(limiter, "2", 100_000);
+ }
+
+ @Test
+ public void testUnlimitedRejectsNegativeCapacity() throws Exception {
+ intercept(IllegalArgumentException.class, () ->
+ IORateLimiterSupport.unlimited().acquireIOCapacity("", ROOT, ROOT,
-1));
+ }
+
+ @Test
+ public void testUnlimitedRejectsNullOperation() throws Exception {
+ intercept(IllegalArgumentException.class, () ->
+ IORateLimiterSupport.unlimited().acquireIOCapacity(null, ROOT, null,
0));
+ }
+
+ @Test
+ public void testUnlimitedRejectsNullSource() throws Exception {
+ intercept(IllegalArgumentException.class, () ->
+ IORateLimiterSupport.unlimited().acquireIOCapacity("", null, null, 1));
+ }
+
+ /**
+ * Assert that a request for a given capacity is delayed.
+ * There's no assertion on the duration, only that it is greater than 0.
+ * @param limiter limiter
+ * @param capacity capacity
+ */
+ private static void assertNotDelayed(final RateLimiting limiter, final int
capacity) {
+ assertZeroDuration(capacity, limiter.acquire(capacity));
+ }
+
+ /**
+ * Assert that a request for a given capacity is delayed.
+ * There's no assertion on the duration, only that it is greater than 0.
+ * @param limiter limiter
+ * @param capacity capacity
+ */
+ private static void assertDelayed(final RateLimiting limiter, final int
capacity) {
+ assertNonZeroDuration(capacity, limiter.acquire(capacity));
+ }
+
+ /**
+ * Assert that a request for a given capacity is not delayed.
+ * @param limiter limiter
+ * @param op operation
+ * @param capacity capacity
+ */
+ private static void assertNotDelayed(IORateLimiter limiter, String op, int
capacity) {
+ assertZeroDuration(capacity, limiter.acquireIOCapacity(op, ROOT, null,
capacity));
+ }
+
+ /**
+ * Assert that a request for a given capacity is delayed.
+ * There's no assertion on the duration, only that it is greater than 0.
+ * @param limiter limiter
+ * @param op operation
+ * @param capacity capacity
+ */
+ private static void assertDelayed(IORateLimiter limiter, String op, int
capacity) {
+ assertNonZeroDuration(capacity, limiter.acquireIOCapacity(op, ROOT, null,
capacity));
+ }
+
+ /**
+ * Assert that duration was not zero.
+ * @param capacity capacity requested
+ * @param duration duration
+ */
+ private static void assertNonZeroDuration(final int capacity, final Duration
duration) {
+ LOG.info("Delay for {} capacity: {}", capacity, duration);
+ Assertions.assertThat(duration)
+ .describedAs("delay for %d capacity", capacity)
+ .isGreaterThan(Duration.ZERO);
+ }
+
+ /**
+ * Assert that duration was zero.
+ * @param capacity capacity requested
+ * @param duration duration
+ */
+ private static void assertZeroDuration(final int capacity, final Duration
duration) {
+ Assertions.assertThat(duration)
+ .describedAs("delay for %d capacity", capacity)
+ .isEqualTo(Duration.ZERO);
+ }
+}
Review Comment:
Nit: EOF warning.
> [ABFS, S3A] Add IORateLimiter api to hadoop common
> --------------------------------------------------
>
> Key: HADOOP-19140
> URL: https://issues.apache.org/jira/browse/HADOOP-19140
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs, fs/azure, fs/s3
> Affects Versions: 3.4.0
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Minor
> Labels: pull-request-available
>
> Create a rate limiter API in hadoop common which code (initially, manifest
> committer, bulk delete).. can request iO capacity for a specific operation.
> this can be exported by filesystems so support shared rate limiting across
> all threads
> pulled from HADOOP-19093 PR
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]