This is an automated email from the ASF dual-hosted git repository.
liuml07 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new e9e1ead HADOOP-17027. Add tests for reading fair call queue capacity
weight configs. Contributed by Fengnan Li
e9e1ead is described below
commit e9e1ead089c0b9f5f1788361329a64fec6561352
Author: Mingliang Liu <[email protected]>
AuthorDate: Thu May 7 16:50:23 2020 -0700
HADOOP-17027. Add tests for reading fair call queue capacity weight
configs. Contributed by Fengnan Li
---
.../apache/hadoop/ipc/TestCallQueueManager.java | 55 ++++++++++++++++++++++
.../org/apache/hadoop/ipc/TestFairCallQueue.java | 25 ++++------
2 files changed, 65 insertions(+), 15 deletions(-)
diff --git
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
index bb4717e..38b3fe5 100644
---
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
+++
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
@@ -176,6 +176,12 @@ public class TestCallQueueManager {
private static final Class<? extends RpcScheduler> schedulerClass
= CallQueueManager.convertSchedulerClass(DefaultRpcScheduler.class);
+ private static final Class<? extends BlockingQueue<FakeCall>> fcqueueClass
+ = CallQueueManager.convertQueueClass(FairCallQueue.class,
FakeCall.class);
+
+ private static final Class<? extends RpcScheduler> rpcSchedulerClass
+ = CallQueueManager.convertSchedulerClass(DecayRpcScheduler.class);
+
@Test
public void testCallQueueCapacity() throws InterruptedException {
manager = new CallQueueManager<FakeCall>(queueClass, schedulerClass, false,
@@ -319,6 +325,55 @@ public class TestCallQueueManager {
assertEquals(totalCallsConsumed, totalCallsCreated);
}
+ @Test
+ public void testQueueCapacity() throws InterruptedException {
+ int capacity = 4;
+ String ns = "ipc.8020";
+ conf.setInt("ipc.8020.scheduler.priority.levels", 2);
+ conf.set("ipc.8020.callqueue.capacity.weights", "1,3");
+ manager = new CallQueueManager<>(fcqueueClass, rpcSchedulerClass, false,
+ capacity, ns, conf);
+
+ // insert 4 calls with 2 at each priority
+ // since the queue with priority 0 has only 1 capacity, the second call
+ // with p0 will be overflowed to queue with priority 1
+ for (int i = 0; i < capacity; i++) {
+ FakeCall fc = new FakeCall(i);
+ fc.setPriorityLevel(i%2);
+ manager.put(fc);
+ }
+
+ // get calls, the order should be
+ // call 0 with p0
+ // call 1 with p1
+ // call 2 with p0 since overflow
+ // call 3 with p1
+ assertEquals(manager.take().priorityLevel, 0);
+ assertEquals(manager.take().priorityLevel, 1);
+ assertEquals(manager.take().priorityLevel, 0);
+ assertEquals(manager.take().priorityLevel, 1);
+
+ conf.set("ipc.8020.callqueue.capacity.weights", "1,1");
+ manager = new CallQueueManager<>(fcqueueClass, rpcSchedulerClass, false,
+ capacity, ns, conf);
+
+ for (int i = 0; i < capacity; i++) {
+ FakeCall fc = new FakeCall(i);
+ fc.setPriorityLevel(i%2);
+ manager.put(fc);
+ }
+
+ // get calls, the order should be
+ // call 0 with p0
+ // call 2 with p0
+ // call 1 with p1
+ // call 3 with p1
+ assertEquals(manager.take().priorityLevel, 0);
+ assertEquals(manager.take().priorityLevel, 0);
+ assertEquals(manager.take().priorityLevel, 1);
+ assertEquals(manager.take().priorityLevel, 1);
+ }
+
public static class ExceptionFakeCall implements Schedulable {
public ExceptionFakeCall() {
throw new IllegalArgumentException("Exception caused by call queue " +
diff --git
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
index f478957..1fed9a3 100644
---
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
+++
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
@@ -170,18 +170,13 @@ public class TestFairCallQueue {
// default weights i.e. all queues share capacity
fcq = new FairCallQueue<Schedulable>(numQueues, 4, "ns", conf);
FairCallQueue<Schedulable> fcq1 = new FairCallQueue<Schedulable>(
- numQueues, capacity, "ns", new int[]{3, 1}, conf);
+ numQueues, capacity, "ns", new int[]{1, 3}, conf);
for (int i=0; i < capacity; i++) {
Schedulable call = mockCall("u", i%2);
calls.add(call);
fcq.add(call);
fcq1.add(call);
-
- call = mockCall("u", (i++)%2);
- calls.add(call);
- fcq.add(call);
- fcq1.add(call);
}
final AtomicInteger currentIndex = new AtomicInteger();
@@ -200,24 +195,24 @@ public class TestFairCallQueue {
// either queue will have two calls
// v
- // 0 2
- // 1 3
+ // 0 1
+ // 2 3
currentIndex.set(1);
- assertSame(calls.get(2), fcq.poll());
+ assertSame(calls.get(1), fcq.poll());
assertSame(calls.get(3), fcq.poll());
assertSame(calls.get(0), fcq.poll());
- assertSame(calls.get(1), fcq.poll());
+ assertSame(calls.get(2), fcq.poll());
// queues with different number of calls
// v
- // 0 3
- // 1
- // 2
+ // 0 1
+ // 2
+ // 3
currentIndex.set(1);
- assertSame(calls.get(3), fcq1.poll());
- assertSame(calls.get(0), fcq1.poll());
assertSame(calls.get(1), fcq1.poll());
assertSame(calls.get(2), fcq1.poll());
+ assertSame(calls.get(3), fcq1.poll());
+ assertSame(calls.get(0), fcq1.poll());
}
@SuppressWarnings("unchecked")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]