Try to stop all compaction upon Keyspace or ColumnFamily drop patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-4221
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d9b7f55 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d9b7f55 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d9b7f55 Branch: refs/heads/trunk Commit: 1d9b7f5597f2cd090e312e270431ab56c81de9d9 Parents: 46722cc Author: Pavel Yaskevich <[email protected]> Authored: Mon May 28 20:59:14 2012 +0300 Committer: Pavel Yaskevich <[email protected]> Committed: Mon May 28 20:59:14 2012 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/cache/AutoSavingCache.java | 7 +-- src/java/org/apache/cassandra/db/DefsTable.java | 6 ++- .../db/compaction/AbstractCompactionIterable.java | 5 -- .../cassandra/db/compaction/CompactionInfo.java | 41 +++++++++------ .../cassandra/db/compaction/CompactionManager.java | 24 +++++++-- .../cassandra/db/index/SecondaryIndexBuilder.java | 2 - .../org/apache/cassandra/utils/FBUtilities.java | 12 ++++ 8 files changed, 64 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0156fe2..2709320 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -63,6 +63,7 @@ * (cql3) Adds simple access to column timestamp and ttl (CASSANDRA-4217) * (cql3) Fix range queries with secondary indexes (CASSANDRA-4257) * Better error messages from improper input in cli (CASSANDRA-3865) + * Try to stop all compaction upon Keyspace or ColumnFamily drop (CASSANDRA-4221) Merged from 1.0: * Fix super columns bug where cache is not updated (CASSANDRA-4190) * fix maxTimestamp to include row tombstones (CASSANDRA-4116) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index aff7aa8..659e9ec 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -192,12 +192,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K else type = OperationType.UNKNOWN; - info = new CompactionInfo(this.hashCode(), - "Global", - cacheType.toString(), - type, - 0, - estimatedTotalBytes); + info = new CompactionInfo(type, 0, estimatedTotalBytes); } public CompactionInfo getCompactionInfo() http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/db/DefsTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java index 1b37de1..be23934 100644 --- a/src/java/org/apache/cassandra/db/DefsTable.java +++ b/src/java/org/apache/cassandra/db/DefsTable.java @@ -35,6 +35,7 @@ import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecord; import org.apache.cassandra.config.*; +import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.db.marshal.AsciiType; @@ -43,7 +44,6 @@ import org.apache.cassandra.db.migration.avro.KsDef; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.thrift.CfDef; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -474,6 +474,8 @@ public class DefsTable KSMetaData ksm = Schema.instance.getTableDefinition(ksName); String snapshotName = Table.getTimestampedSnapshotName(ksName); + CompactionManager.instance.stopCompactionFor(ksm.cfMetaData().values()); + // remove all cfs from the table instance. for (CFMetaData cfm : ksm.cfMetaData().values()) { @@ -507,6 +509,8 @@ public class DefsTable Schema.instance.purge(cfm); Schema.instance.setTableDefinition(makeNewKeyspaceDefinition(ksm, cfm)); + CompactionManager.instance.stopCompactionFor(Arrays.asList(cfm)); + if (!StorageService.instance.isClientMode()) { if (DatabaseDescriptor.isAutoSnapshot()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java index 8976f4e..1eb4e9b 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java @@ -21,14 +21,11 @@ package org.apache.cassandra.db.compaction; */ -import java.io.IOException; -import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.CloseableIterator; public abstract class AbstractCompactionIterable extends CompactionInfo.Holder implements Iterable<AbstractCompactedRow> @@ -57,8 +54,6 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i public CompactionInfo getCompactionInfo() { return new CompactionInfo(this.hashCode(), - controller.getKeyspace(), - controller.getColumnFamily(), type, bytesRead, totalBytes); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java index 937557f..17d098b 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java @@ -22,46 +22,55 @@ import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; + /** Implements serializable to allow structured info to be returned via JMX. */ public final class CompactionInfo implements Serializable { private static final long serialVersionUID = 3695381572726744816L; - private final int id; - private final String ksname; - private final String cfname; + private final CFMetaData cfm; private final OperationType tasktype; private final long bytesComplete; private final long totalBytes; - public CompactionInfo(int id, String ksname, String cfname, OperationType tasktype, long bytesComplete, long totalBytes) + public CompactionInfo(OperationType tasktype, long bytesComplete, long totalBytes) + { + this(null, tasktype, bytesComplete, totalBytes); + } + + public CompactionInfo(Integer id, OperationType tasktype, long bytesComplete, long totalBytes) { - this.id = id; - this.ksname = ksname; - this.cfname = cfname; this.tasktype = tasktype; this.bytesComplete = bytesComplete; this.totalBytes = totalBytes; + this.cfm = id == null ? null : Schema.instance.getCFMetaData(id); } /** @return A copy of this CompactionInfo with updated progress. */ public CompactionInfo forProgress(long bytesComplete, long totalBytes) { - return new CompactionInfo(id, ksname, cfname, tasktype, bytesComplete, totalBytes); + return new CompactionInfo(cfm == null ? null : cfm.cfId, tasktype, bytesComplete, totalBytes); } - public int getId() + public Integer getId() { - return id; + return cfm == null ? null : cfm.cfId; } public String getKeyspace() { - return ksname; + return cfm == null ? null : cfm.ksName; } public String getColumnFamily() { - return cfname; + return cfm == null ? null : cfm.cfName; + } + + public CFMetaData getCFMetaData() + { + return cfm; } public long getBytesComplete() @@ -82,7 +91,7 @@ public final class CompactionInfo implements Serializable public String toString() { StringBuilder buff = new StringBuilder(); - buff.append(getTaskType()).append('@').append(id); + buff.append(getTaskType()).append('@').append(getId()); buff.append('(').append(getKeyspace()).append(", ").append(getColumnFamily()); buff.append(", ").append(getBytesComplete()).append('/').append(getTotalBytes()); return buff.append(')').toString(); @@ -91,9 +100,9 @@ public final class CompactionInfo implements Serializable public Map<String, String> asMap() { Map<String, String> ret = new HashMap<String, String>(); - ret.put("id", Integer.toString(id)); - ret.put("keyspace", ksname); - ret.put("columnfamily", cfname); + ret.put("id", Integer.toString(getId())); + ret.put("keyspace", getKeyspace()); + ret.put("columnfamily", getColumnFamily()); ret.put("bytesComplete", Long.toString(bytesComplete)); ret.put("totalBytes", Long.toString(totalBytes)); ret.put("taskType", tasktype.toString()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index e09a012..38264f5 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -34,6 +34,7 @@ import javax.management.ObjectName; import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; @@ -1204,8 +1205,6 @@ public class CompactionManager implements CompactionManagerMBean try { return new CompactionInfo(this.hashCode(), - sstable.descriptor.ksname, - sstable.descriptor.cfname, OperationType.CLEANUP, scanner.getCurrentPosition(), scanner.getLengthInBytes()); @@ -1232,8 +1231,6 @@ public class CompactionManager implements CompactionManagerMBean try { return new CompactionInfo(this.hashCode(), - sstable.descriptor.ksname, - sstable.descriptor.cfname, OperationType.SCRUB, dataFile.getFilePointer(), dataFile.length()); @@ -1254,4 +1251,23 @@ public class CompactionManager implements CompactionManagerMBean holder.stop(); } } + + /** + * Try to stop all of the compactions for given ColumnFamilies. + * Note that this method does not wait indefinitely for all compactions to finish, maximum wait time is 30 secs. + * + * @param columnFamilies The ColumnFamilies to try to stop compaction upon. + */ + public void stopCompactionFor(Collection<CFMetaData> columnFamilies) + { + assert columnFamilies != null; + + for (Holder compactionHolder : CompactionExecutor.getCompactions()) + { + CompactionInfo info = compactionHolder.getCompactionInfo(); + + if (columnFamilies.contains(info.getCFMetaData())) + compactionHolder.stop(); // signal compaction to stop + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java index 5cdd26a..39f2c2d 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java @@ -48,8 +48,6 @@ public class SecondaryIndexBuilder extends CompactionInfo.Holder public CompactionInfo getCompactionInfo() { return new CompactionInfo(this.hashCode(), - cfs.table.name, - cfs.columnFamily, OperationType.INDEX_BUILD, iter.getBytesRead(), iter.getTotalBytes()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d9b7f55/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index fb2adfe..106bedf 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -581,6 +581,18 @@ public class FBUtilities } } + public static void sleep(int millis) + { + try + { + Thread.sleep(millis); + } + catch (InterruptedException e) + { + throw new AssertionError(); + } + } + private static final class WrappedCloseableIterator<T> extends AbstractIterator<T> implements CloseableIterator<T> {
