Updated Branches: refs/heads/cassandra-1.2 21f63a916 -> aa90c88be refs/heads/trunk 0467c19d9 -> f112f992b
merge from 1.2 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f112f992 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f112f992 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f112f992 Branch: refs/heads/trunk Commit: f112f992bb7207435cf083c1a072ee9d0776906a Parents: d9d9d65 aa90c88 Author: Jonathan Ellis <[email protected]> Authored: Thu Jan 31 22:51:18 2013 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Thu Jan 31 22:51:18 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/config/CFMetaData.java | 7 + .../org/apache/cassandra/config/KSMetaData.java | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 104 ++++++++++----- src/java/org/apache/cassandra/db/SystemTable.java | 93 +++++++++++-- .../cassandra/db/compaction/CompactionTask.java | 8 + .../io/sstable/AbstractSSTableSimpleWriter.java | 2 +- .../apache/cassandra/service/CassandraDaemon.java | 9 ++ .../apache/cassandra/db/ColumnFamilyStoreTest.java | 101 +++++++++----- .../org/apache/cassandra/db/SystemTableTest.java | 1 + .../cassandra/db/compaction/CompactionsTest.java | 36 +++++ 11 files changed, 281 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f112f992/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index e539f5b,faefd55..004bd3e --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,11 -1,5 +1,12 @@@ +1.3 + * make index_interval configurable per columnfamily (CASSANDRA-3961) + * add default_tim_to_live (CASSANDRA-3974) + * add memtable_flush_period_in_ms (CASSANDRA-4237) + * replace supercolumns internally by composites (CASSANDRA-3237, 5123) + * upgrade thrift to 0.9.0 (CASSANDRA-3719) + 1.2.2 + * more robust solution to incomplete compactions + counters (CASSANDRA-5151) * fix symlinks under data dir not working (CASSANDRA-5185) * fix bug in compact storage metadata handling (CASSANDRA-5189) * Validate login for USE queries (CASSANDRA-5207) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f112f992/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f112f992/src/java/org/apache/cassandra/config/KSMetaData.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f112f992/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 03db0df,0769d5c..7afaf96 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -41,10 -41,10 +41,11 @@@ import org.apache.cassandra.cache.IRowC import org.apache.cassandra.cache.RowCacheKey; import org.apache.cassandra.cache.RowCacheSentinel; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; - import org.apache.cassandra.concurrent.NamedThreadFactory; - import org.apache.cassandra.concurrent.StageManager; - import org.apache.cassandra.config.*; + import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.CFMetaData.SpeculativeRetry; + import org.apache.cassandra.config.ColumnDefinition; + import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.ReplayPosition; @@@ -1366,7 -1409,7 +1407,6 @@@ public class ColumnFamilyStore implemen /** * Iterate over a range of rows and columns from memtables/sstables. * -- * @param superColumn optional SuperColumn to slice subcolumns of; null to slice top-level columns * @param range Either a Bounds, which includes start key, or a Range, which does not. * @param columnFilter description of the columns we're interested in for each row */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f112f992/src/java/org/apache/cassandra/db/SystemTable.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/SystemTable.java index 8752eab,4f6234b..c1ede5b --- a/src/java/org/apache/cassandra/db/SystemTable.java +++ b/src/java/org/apache/cassandra/db/SystemTable.java @@@ -38,25 -39,19 +39,18 @@@ import org.apache.cassandra.config.Sche import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; + import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.filter.QueryFilter; - import org.apache.cassandra.db.marshal.AsciiType; - import org.apache.cassandra.db.marshal.BytesType; - import org.apache.cassandra.db.marshal.UTF8Type; -import org.apache.cassandra.db.filter.QueryPath; + import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; + import org.apache.cassandra.exceptions.ConfigurationException; + import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.DataOutputBuffer; - import org.apache.cassandra.io.util.FastByteArrayOutputStream; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.thrift.Constants; +import org.apache.cassandra.thrift.cassandraConstants; - import org.apache.cassandra.utils.ByteBufferUtil; - import org.apache.cassandra.utils.CounterId; - import org.apache.cassandra.utils.FBUtilities; - import org.apache.cassandra.utils.Pair; + import org.apache.cassandra.utils.*; import static org.apache.cassandra.cql3.QueryProcessor.processInternal; @@@ -186,6 -182,74 +181,74 @@@ public class SystemTabl } } + /** + * Write compaction log, except columfamilies under system keyspace. + * + * @param cfs + * @param toCompact sstables to compact + * @return compaction task id or null if cfs is under system keyspace + */ + public static UUID startCompaction(ColumnFamilyStore cfs, Iterable<SSTableReader> toCompact) + { - if (Table.SYSTEM_KS.equals(cfs.table.name)) ++ if (Table.SYSTEM_KS.equals(cfs.table.getName())) + return null; + + UUID compactionId = UUIDGen.getTimeUUID(); + String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (%s, '%s', '%s', {%s})"; + Iterable<Integer> generations = Iterables.transform(toCompact, new Function<SSTableReader, Integer>() + { + public Integer apply(SSTableReader sstable) + { + return sstable.descriptor.generation; + } + }); - processInternal(String.format(req, COMPACTION_LOG, compactionId, cfs.table.name, cfs.columnFamily, StringUtils.join(generations.iterator(), ','))); ++ processInternal(String.format(req, COMPACTION_LOG, compactionId, cfs.table.getName(), cfs.name, StringUtils.join(generations.iterator(), ','))); + forceBlockingFlush(COMPACTION_LOG); + return compactionId; + } + + public static void finishCompaction(UUID taskId) + { + assert taskId != null; + + String req = "DELETE FROM system.%s WHERE id = %s"; + processInternal(String.format(req, COMPACTION_LOG, taskId)); + forceBlockingFlush(COMPACTION_LOG); + } + + /** + * @return unfinished compactions, grouped by keyspace/columnfamily pair. + */ + public static SetMultimap<Pair<String, String>, Integer> getUnfinishedCompactions() + { + String req = "SELECT * FROM system.%s"; + UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG)); + + SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = HashMultimap.create(); + for (UntypedResultSet.Row row : resultSet) + { + String keyspace = row.getString("keyspace_name"); + String columnfamily = row.getString("columnfamily_name"); + Set<Integer> inputs = row.getSet("inputs", Int32Type.instance); + + unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), inputs); + } + return unfinishedCompactions; + } + + public static void discardCompactionsInProgress() + { + ColumnFamilyStore compactionLog = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(COMPACTION_LOG); + try + { + compactionLog.truncate().get(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + public static void saveTruncationPosition(ColumnFamilyStore cfs, ReplayPosition position) { String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f112f992/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 458fbc5,ca1e2da..9db0a1a --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@@ -109,8 -110,10 +110,10 @@@ public class CompactionTask extends Abs // sanity check: all sstables must belong to the same cfs for (SSTableReader sstable : toCompact) - assert sstable.descriptor.cfname.equals(cfs.columnFamily); + assert sstable.descriptor.cfname.equals(cfs.name); + UUID taskId = SystemTable.startCompaction(cfs, toCompact); + CompactionController controller = new CompactionController(cfs, toCompact, gcBefore); // new sstables from flush can be added during a compaction, but only the compaction can remove them, // so in our single-threaded compaction world this is a valid way of determining if we're compacting http://git-wip-us.apache.org/repos/asf/cassandra/blob/f112f992/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f112f992/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index e77277f,8048fcb..e92c4b7 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@@ -43,42 -33,42 +33,31 @@@ import org.apache.commons.lang.ArrayUti import org.apache.commons.lang.StringUtils; import org.junit.Test; --import static org.junit.Assert.assertNull; --import static junit.framework.Assert.assertEquals; --import static junit.framework.Assert.assertSame; --import static junit.framework.Assert.assertTrue; --import static org.apache.cassandra.Util.column; --import static org.apache.cassandra.Util.dk; --import static org.apache.cassandra.Util.getBytes; --import static org.apache.cassandra.Util.rp; --import static org.apache.cassandra.db.TableTest.assertColumns; --import static org.apache.cassandra.utils.ByteBufferUtil.bytes; --import static org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY; -- import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; + import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; - import org.apache.cassandra.exceptions.ConfigurationException; + import org.apache.cassandra.config.Schema; -import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.db.marshal.LexicalUUIDType; import org.apache.cassandra.db.marshal.LongType; --import org.apache.cassandra.dht.Bounds; --import org.apache.cassandra.dht.ExcludingBounds; --import org.apache.cassandra.dht.IPartitioner; --import org.apache.cassandra.dht.IncludingExcludingBounds; --import org.apache.cassandra.dht.Range; - import org.apache.cassandra.io.sstable.Component; - import org.apache.cassandra.io.sstable.Descriptor; - import org.apache.cassandra.io.sstable.SSTable; - import org.apache.cassandra.io.sstable.SSTableReader; ++import org.apache.cassandra.dht.*; ++import org.apache.cassandra.exceptions.ConfigurationException; + import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.WrappedRunnable; ++import static junit.framework.Assert.*; ++import static org.apache.cassandra.Util.*; ++import static org.apache.cassandra.utils.ByteBufferUtil.bytes; ++import static org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY; ++import static org.junit.Assert.assertNull; ++ public class ColumnFamilyStoreTest extends SchemaLoader { static byte[] bytes1, bytes2; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f112f992/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ----------------------------------------------------------------------
