This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new abeaa3e  Fix ConnectionTest.testAcquireReleaseOutbound
abeaa3e is described below

commit abeaa3ea5ef99691cc1b29787cfcd573a90e34fb
Author: yifan-c <yc25c...@gmail.com>
AuthorDate: Tue Jan 28 11:12:30 2020 -0800

    Fix ConnectionTest.testAcquireReleaseOutbound
    
    patch by Yifan Cai; reviewed by Benedict for CASSANDRA-15308
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/net/OutboundConnection.java   |  10 +-
 .../org/apache/cassandra/net/ConnectionTest.java   | 101 ++++++++++++++++-----
 3 files changed, 86 insertions(+), 26 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 5858c19..0bc3317 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 4.0-alpha4
  * Improve the algorithmic token allocation in case racks = RF 
(CASSANDRA-15600)
+ * Fix ConnectionTest.testAcquireReleaseOutbound (CASSANDRA-15308)
  * Include finalized pending sstables in preview repair (CASSANDRA-15553)
  * Reverted to the original behavior of CLUSTERING ORDER on CREATE TABLE 
(CASSANDRA-15271)
  * Correct inaccurate logging message (CASSANDRA-15549)
diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java 
b/src/java/org/apache/cassandra/net/OutboundConnection.java
index 63b909c..9661e8e 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnection.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.stream.Stream;
 
 import javax.annotation.Nullable;
 
@@ -1722,8 +1723,15 @@ public class OutboundConnection
         releaseCapacity(1, amount);
     }
 
+    @VisibleForTesting
+    void unsafeReleaseCapacity(long count, long amount)
+    {
+        releaseCapacity(count, amount);
+    }
+
+    @VisibleForTesting
     Limit unsafeGetEndpointReserveLimits()
     {
         return reserveCapacityInBytes.endpoint;
     }
-}
\ No newline at end of file
+}
diff --git a/test/unit/org/apache/cassandra/net/ConnectionTest.java 
b/test/unit/org/apache/cassandra/net/ConnectionTest.java
index 7b69cb9..d4ec84c 100644
--- a/test/unit/org/apache/cassandra/net/ConnectionTest.java
+++ b/test/unit/org/apache/cassandra/net/ConnectionTest.java
@@ -25,11 +25,13 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -766,40 +768,89 @@ public class ConnectionTest
     @Test
     public void testAcquireReleaseOutbound() throws Throwable
     {
+        // In each test round, K capacity is reserved upfront.
+        // Two groups of threads each release/acquire for K capacity in total 
accordingly,
+        //   i.e. if only the release threads run, at the end, the reserved 
capacity is 0 (K - K).
+        // During the test, we expect N (N <= maxFailures) acquire attempts 
(for M capacity) to fail.
+        // The reserved capacity (pendingBytes) at the end of the round should 
equal to K - N * M,
+        //   which you can find in the assertion.
         test((inbound, outbound, endpoint) -> {
-            ExecutorService executor = Executors.newFixedThreadPool(100);
-            int acquireStep = 123;
-            Assert.assertTrue(outbound.unsafeAcquireCapacity(100 * 10000, 100 
* 10000 * acquireStep));
+            // max capacity equals to permit-free sendQueueCapcity + the 
minimun of endpoint and global reserve
+            double maxSendQueueCapacity = 
outbound.settings().applicationSendQueueCapacityInBytes +
+                                          
Double.min(outbound.settings().applicationSendQueueReserveEndpointCapacityInBytes,
+                                                     
outbound.settings().applicationSendQueueReserveGlobalCapacityInBytes.limit());
+            int concurrency = 100;
+            int attempts = 10000;
+            int acquireCount = concurrency * attempts;
+            long acquireStep = Math.round(maxSendQueueCapacity * 1.2 / 
acquireCount / 2); // It is guranteed to acquire (~20%) more
+            // The total overly acquired amount divides the amount acquired in 
each step. Get the ceil value so not to miss the acquire that just exceeds.
+            long maxFailures = (long) Math.ceil((acquireCount * acquireStep * 
2 - maxSendQueueCapacity) / acquireStep); // The result must be in the range of 
lone
             AtomicLong acquisitionFailures = new AtomicLong();
-            for (int i = 0; i < 100; i++)
-            {
-                executor.submit(() -> {
-                    for (int j = 0; j < 10000; j++)
-                    {
-                        if (!outbound.unsafeAcquireCapacity(acquireStep))
-                            acquisitionFailures.incrementAndGet();
-                    }
+            Runnable acquirer = () -> {
+                for (int j = 0; j < attempts; j++)
+                {
+                    if (!outbound.unsafeAcquireCapacity(acquireStep))
+                        acquisitionFailures.incrementAndGet();
+                }
+            };
+            Runnable releaser = () -> {
+                for (int j = 0; j < attempts; j++)
+                    outbound.unsafeReleaseCapacity(acquireStep);
+            };
 
-                });
-            }
+            // Start N acquirer and releaser to contend for capcaity
+            List<Runnable> submitOrder = new ArrayList<>(concurrency * 2);
+            for (int i = 0 ; i < concurrency ; ++i)
+                submitOrder.add(acquirer);
+            for (int i = 0 ; i < concurrency ; ++i)
+                submitOrder.add(releaser);
+            // randomize their start order
+            randomize(submitOrder);
 
-            for (int i = 0; i < 100; i++)
+            try
             {
-                executor.submit(() -> {
-                    for (int j = 0; j < 10000; j++)
-                        outbound.unsafeReleaseCapacity(acquireStep);
-                });
+                // Reserve enough capacity upfront to ensure the releaser 
threads cannot release all reserved capacity.
+                // i.e. the pendingBytes is always positive during the test.
+                Assert.assertTrue("Unable to reserve enough capacity",
+                                  outbound.unsafeAcquireCapacity(acquireCount, 
acquireCount * acquireStep));
+                ExecutorService executor = 
Executors.newFixedThreadPool(concurrency);
+
+                submitOrder.forEach(executor::submit);
+
+                executor.shutdown();
+                executor.awaitTermination(10, TimeUnit.SECONDS);
+
+                Assert.assertEquals(acquireCount * acquireStep - 
(acquisitionFailures.get() * acquireStep), outbound.pendingBytes());
+                Assert.assertEquals(acquireCount - acquisitionFailures.get(), 
outbound.pendingCount());
+                Assert.assertTrue(String.format("acquisitionFailures should be 
capped by maxFailure. acquisitionFailures: %d, acquisitionFailures: %d",
+                                                maxFailures, 
acquisitionFailures.get()),
+                                  acquisitionFailures.get() <= maxFailures);
+            }
+            finally
+            {   // release the acquired capacity from this round
+                outbound.unsafeReleaseCapacity(outbound.pendingCount(), 
outbound.pendingBytes());
             }
-
-            executor.shutdown();
-            executor.awaitTermination(10, TimeUnit.SECONDS);
-
-            // We can release more than we acquire, which certainly should not 
happen in
-            // real life, but since it's a test just for acquisition and 
release, it is fine
-            Assert.assertEquals(100 * 10000 * acquireStep - 
(acquisitionFailures.get() * acquireStep), outbound.pendingBytes());
         });
     }
 
+    private static <V> void randomize(List<V> list)
+    {
+        long seed = ThreadLocalRandom.current().nextLong();
+        logger.info("Seed used for randomize: " + seed);
+        Random random = new Random(seed);
+        switch (random.nextInt(3))
+        {
+            case 0:
+                Collections.shuffle(list, random);
+                break;
+            case 1:
+                Collections.reverse(list);
+                break;
+            case 2:
+                // leave as is
+        }
+    }
+
     private void connect(OutboundConnection outbound) throws Throwable
     {
         CountDownLatch latch = new CountDownLatch(1);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to