This is an automated email from the ASF dual-hosted git repository.
bereng pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
new 23ad7c3 Flaky ActiveRepairServiceTest.testRejectWhenPoolFullStrategy
new 1676add Merge branch 'cassandra-4.0.0' into cassandra-4.0
23ad7c3 is described below
commit 23ad7c301e227d5ea88cea0784b32e6351603912
Author: Bereng <[email protected]>
AuthorDate: Fri Jun 11 07:52:05 2021 +0200
Flaky ActiveRepairServiceTest.testRejectWhenPoolFullStrategy
patch by Berenguer Blasi; reviewed by Andres de la Peña and Michael Semb
Wever for CASSANDRA-16685
---
.../cassandra/service/ActiveRepairServiceTest.java | 40 +++++++++++++++++-----
1 file changed, 32 insertions(+), 8 deletions(-)
diff --git
a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index d0a367a..ad680f5 100644
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@ -18,19 +18,27 @@
*/
package org.apache.cassandra.service;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
+
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -55,8 +63,8 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.repair.messages.RepairOption;
-import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.Refs;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
@@ -77,6 +85,7 @@ public class ActiveRepairServiceTest
public static final String KEYSPACE5 = "Keyspace5";
public static final String CF_STANDARD1 = "Standard1";
public static final String CF_COUNTER = "Counter1";
+ public static final int TASK_SECONDS = 10;
public String cfname;
public ColumnFamilyStore store;
@@ -376,8 +385,20 @@ public class ActiveRepairServiceTest
{
Condition blocked = new SimpleCondition();
CountDownLatch completed = new CountDownLatch(2);
+
+ /*
+ * CASSANDRA-16685 This is a Java bug. When the underlying
executor's queue is a SynchronousQueue, there can
+ * be races just after the ThreadPool's initialization while
juggling and spinning up threads internally
+ * leading to false rejections. That queue needs a thread ready to
pick up the task immediately or it will
+ * produce a reject exception upon 'offer()' method call on the
executor's code. If the executor is still
+ * initializing or threads are not ready to take work you can get
false rejections.
+ *
+ * A sleep has been added to give time to the thread pool to be
ready to get work.
+ */
+ Thread.sleep(250);
validationExecutor.submit(new Task(blocked, completed));
validationExecutor.submit(new Task(blocked, completed));
+
try
{
validationExecutor.submit(new Task(blocked, completed));
@@ -387,10 +408,13 @@ public class ActiveRepairServiceTest
{
// expected
}
+
// allow executing tests to complete
blocked.signalAll();
- completed.await(10, TimeUnit.SECONDS);
+ completed.await(TASK_SECONDS + 1, TimeUnit.SECONDS);
+
// Submission is unblocked
+ Thread.sleep(250);
validationExecutor.submit(() -> {});
}
finally
@@ -425,7 +449,7 @@ public class ActiveRepairServiceTest
}
// Make sure all tasks have been submitted to the validation
executor
- allSubmitted.await(10, TimeUnit.SECONDS);
+ allSubmitted.await(TASK_SECONDS + 1, TimeUnit.SECONDS);
// Give the tasks we expect to execute immediately chance to be
scheduled
Util.spinAssertEquals(2 , ((DebuggableThreadPoolExecutor)
validationExecutor)::getActiveTaskCount, 1);
@@ -436,7 +460,7 @@ public class ActiveRepairServiceTest
Assert.assertEquals(3, ((DebuggableThreadPoolExecutor)
validationExecutor).getPendingTaskCount());
// allow executing tests to complete
blocked.signalAll();
- completed.await(10, TimeUnit.SECONDS);
+ completed.await(TASK_SECONDS + 1, TimeUnit.SECONDS);
}
finally
{
@@ -458,7 +482,7 @@ public class ActiveRepairServiceTest
public void run()
{
- Uninterruptibles.awaitUninterruptibly(blocked, 10,
TimeUnit.SECONDS);
+ Uninterruptibles.awaitUninterruptibly(blocked, TASK_SECONDS,
TimeUnit.SECONDS);
complete.countDown();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]