This is an automated email from the ASF dual-hosted git repository.
jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 652e49a8bb Add separate thread pool for Secondary Index building so it
doesn't block compactions
652e49a8bb is described below
commit 652e49a8bb2f8849ce0ebb5161c30e3889c14608
Author: Josh McKenzie <[email protected]>
AuthorDate: Wed Jul 27 12:57:28 2022 -0400
Add separate thread pool for Secondary Index building so it doesn't block
compactions
Patch by Chris Lohfink; reviewed by Caleb Rackliffe, Josh McKenzie, Sam
Tunnicliffe, and Marcus Eriksson for CASSANDRA-17781
Co-authored-by: Chris Lohfink <[email protected]>
Co-authored-by: Josh McKenzie <[email protected]>
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/config/Config.java | 3 +++
.../apache/cassandra/config/DatabaseDescriptor.java | 5 +++++
.../cassandra/db/compaction/CompactionManager.java | 21 ++++++++++++++++++---
4 files changed, 27 insertions(+), 3 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 105522af5b..4dda88e314 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * Add separate thread pool for Secondary Index building so it doesn't block
compactions (CASSANDRA-17781)
* Added JMX call to getSSTableCountPerTWCSBucket for TWCS (CASSANDRA-17774)
* When doing a host replacement, -Dcassandra.broadcast_interval_ms is used to
know when to check the ring but checks that the ring wasn't changed in
-Dcassandra.ring_delay_ms, changes to ring delay should not depend on when we
publish load stats (CASSANDRA-17776)
* When bootstrap fails, CassandraRoleManager may attempt to do read queries
that fail with "Cannot read from a bootstrapping node", and increments
unavailables counters (CASSANDRA-17754)
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index a17c3591ce..f7eabff8de 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -327,6 +327,9 @@ public class Config
public volatile int concurrent_materialized_view_builders = 1;
public volatile int reject_repair_compaction_threshold = Integer.MAX_VALUE;
+ // The number of executors to use for building secondary indexes
+ public int concurrent_index_builders = 2;
+
/**
* @deprecated retry support removed on CASSANDRA-10992
*/
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index b60321e131..0af1ef808f 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2033,6 +2033,11 @@ public class DatabaseDescriptor
return conf.concurrent_validations;
}
+ public static int getConcurrentIndexBuilders()
+ {
+ return conf.concurrent_index_builders;
+ }
+
public static void setConcurrentValidations(int value)
{
value = value > 0 ? value : Integer.MAX_VALUE;
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 925d900ada..49b999e4db 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -129,7 +129,13 @@ public class CompactionManager implements
CompactionManagerMBean
private final CompactionExecutor cacheCleanupExecutor = new
CacheCleanupExecutor();
private final CompactionExecutor viewBuildExecutor = new
ViewBuildExecutor();
- private final CompactionMetrics metrics = new CompactionMetrics(executor,
validationExecutor, viewBuildExecutor);
+ // We can't house 2i builds in SecondaryIndexManagement because it could
cause deadlocks with itself, and can cause
+ // massive to indefinite pauses if prioritized either before or after
normal compactions so we instead put it in its
+ // own pool to prevent either scenario.
+ private final SecondaryIndexExecutor secondaryIndexExecutor = new
SecondaryIndexExecutor();
+
+ private final CompactionMetrics metrics = new CompactionMetrics(executor,
validationExecutor, viewBuildExecutor, secondaryIndexExecutor);
+
@VisibleForTesting
final Multiset<ColumnFamilyStore> compactingCF =
ConcurrentHashMultiset.create();
@@ -244,6 +250,7 @@ public class CompactionManager implements
CompactionManagerMBean
validationExecutor.shutdown();
viewBuildExecutor.shutdown();
cacheCleanupExecutor.shutdown();
+ secondaryIndexExecutor.shutdown();
// interrupt compactions and validations
for (Holder compactionHolder : active.getCompactions())
@@ -254,7 +261,8 @@ public class CompactionManager implements
CompactionManagerMBean
// wait for tasks to terminate
// compaction tasks are interrupted above, so it shuold be fairy quick
// until not interrupted tasks to complete.
- for (ExecutorService exec : Arrays.asList(executor,
validationExecutor, viewBuildExecutor, cacheCleanupExecutor))
+ for (ExecutorService exec : Arrays.asList(executor,
validationExecutor, viewBuildExecutor,
+ cacheCleanupExecutor,
secondaryIndexExecutor))
{
try
{
@@ -1772,7 +1780,7 @@ public class CompactionManager implements
CompactionManagerMBean
}
};
- return executor.submitIfRunning(runnable, "index build");
+ return secondaryIndexExecutor.submitIfRunning(runnable, "index build");
}
/**
@@ -2015,6 +2023,13 @@ public class CompactionManager implements
CompactionManagerMBean
metrics.sstablesDropppedFromCompactions.inc(num);
}
+ private static class SecondaryIndexExecutor extends CompactionExecutor
+ {
+ public SecondaryIndexExecutor()
+ {
+ super(DatabaseDescriptor.getConcurrentIndexBuilders(),
"SecondaryIndexExecutor", Integer.MAX_VALUE);
+ }
+ }
public List<Map<String, String>> getCompactions()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]