This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch cassandra-2.2
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-2.2 by this push:
new 1d615a5 Fix index summary redistribution cancellation
1d615a5 is described below
commit 1d615a5ca151b7dbf24a9367f1379247cb90a5b0
Author: Marcus Eriksson <[email protected]>
AuthorDate: Wed Mar 6 09:43:26 2019 +0100
Fix index summary redistribution cancellation
Patch by marcuse; reviewed by Jordan West for CASSANDRA-15045
---
CHANGES.txt | 1 +
.../cassandra/db/compaction/CompactionManager.java | 3 ++-
.../cassandra/io/sstable/IndexSummaryManager.java | 18 +++++++++++--
.../org/apache/cassandra/utils/FBUtilities.java | 21 +++++++++++++++
.../io/sstable/IndexSummaryManagerTest.java | 30 +++++++++++++++++++++-
5 files changed, 69 insertions(+), 4 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 76fb6dd..e887733 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.15
+ * Fix index summary redistribution cancellation (CASSANDRA-15045)
* Refactor Circle CI configuration (CASSANDRA-14806)
* Fixing invalid CQL in security documentation (CASSANDRA-15020)
* Make tools/bin/token-generator py2/3 compatible (CASSANDRA-15012)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 7871b17..bd34b1f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1751,7 +1751,8 @@ public class CompactionManager implements
CompactionManagerMBean
if ((info.getTaskType() == OperationType.VALIDATION) &&
!interruptValidation)
continue;
- if (Iterables.contains(columnFamilies, info.getCFMetaData()))
+ // cfmetadata is null for index summary redistributions which are
'global' - they involve all keyspaces/tables
+ if (info.getCFMetaData() == null ||
Iterables.contains(columnFamilies, info.getCFMetaData()))
compactionHolder.stop(); // signal compaction to stop
}
}
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 586e4f2..3ebbb6e 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -37,11 +37,13 @@ import
org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -223,10 +225,22 @@ public class IndexSummaryManager implements
IndexSummaryManagerMBean
compactingAndNonCompacting.right,
this.memoryPoolBytes));
}
+ catch (Exception e)
+ {
+ if (!(e instanceof CompactionInterruptedException))
+ logger.error("Got exception during index summary
redistribution", e);
+ throw e;
+ }
finally
{
- for (LifecycleTransaction modifier :
compactingAndNonCompacting.right.values())
- modifier.close();
+ try
+ {
+
FBUtilities.closeAll(compactingAndNonCompacting.right.values());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 4c84743..69b7b5f 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -866,6 +866,27 @@ public class FBUtilities
digest.update((byte) ((val >>> 0) & 0xFF));
}
+ public static void closeAll(Collection<? extends AutoCloseable> l) throws
Exception
+ {
+ Exception toThrow = null;
+ for (AutoCloseable c : l)
+ {
+ try
+ {
+ c.close();
+ }
+ catch (Exception e)
+ {
+ if (toThrow == null)
+ toThrow = e;
+ else
+ toThrow.addSuppressed(e);
+ }
+ }
+ if (toThrow != null)
+ throw toThrow;
+ }
+
public static byte[] toWriteUTFBytes(String s)
{
try
diff --git
a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index a1c0e77..385e88a 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -592,6 +592,34 @@ public class IndexSummaryManagerTest
@Test
public void testCancelIndex() throws Exception
{
+ testCancelIndexHelper(new CancelFunction()
+ {
+ public void cancel(ColumnFamilyStore cfs)
+ {
+ CompactionManager.instance.stopCompaction("INDEX_SUMMARY");
+ }
+ });
+ }
+
+ @Test
+ public void testCancelIndexInterrupt() throws Exception
+ {
+ testCancelIndexHelper(new CancelFunction()
+ {
+ public void cancel(ColumnFamilyStore cfs)
+ {
+
CompactionManager.instance.interruptCompactionFor(Collections.singleton(cfs.metadata),
false);
+ }
+ });
+ }
+
+ private static interface CancelFunction
+ {
+ void cancel(ColumnFamilyStore cfs);
+ }
+
+ public void testCancelIndexHelper(CancelFunction cf) throws Exception
+ {
String ksname = KEYSPACE1;
String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no
key caching
Keyspace keyspace = Keyspace.open(ksname);
@@ -642,7 +670,7 @@ public class IndexSummaryManagerTest
// to ensure that the stop condition check in
IndexSummaryRedistribution::redistributeSummaries
// is made *after* the halt request is made to the CompactionManager,
don't allow the redistribution
// to proceed until stopCompaction has been called.
- CompactionManager.instance.stopCompaction("INDEX_SUMMARY");
+ cf.cancel(cfs);
// allows the redistribution to proceed
barrier.countDown();
t.join();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]