Updated Branches:
  refs/heads/cassandra-1.2 21f63a916 -> aa90c88be
  refs/heads/trunk 0467c19d9 -> f112f992b


merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f112f992
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f112f992
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f112f992

Branch: refs/heads/trunk
Commit: f112f992bb7207435cf083c1a072ee9d0776906a
Parents: d9d9d65 aa90c88
Author: Jonathan Ellis <[email protected]>
Authored: Thu Jan 31 22:51:18 2013 -0600
Committer: Jonathan Ellis <[email protected]>
Committed: Thu Jan 31 22:51:18 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/config/CFMetaData.java    |    7 +
 .../org/apache/cassandra/config/KSMetaData.java    |    1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  104 ++++++++++-----
 src/java/org/apache/cassandra/db/SystemTable.java  |   93 +++++++++++--
 .../cassandra/db/compaction/CompactionTask.java    |    8 +
 .../io/sstable/AbstractSSTableSimpleWriter.java    |    2 +-
 .../apache/cassandra/service/CassandraDaemon.java  |    9 ++
 .../apache/cassandra/db/ColumnFamilyStoreTest.java |  101 +++++++++-----
 .../org/apache/cassandra/db/SystemTableTest.java   |    1 +
 .../cassandra/db/compaction/CompactionsTest.java   |   36 +++++
 11 files changed, 281 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f112f992/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index e539f5b,faefd55..004bd3e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
 +1.3
 + * make index_interval configurable per columnfamily (CASSANDRA-3961)
 + * add default_tim_to_live (CASSANDRA-3974)
 + * add memtable_flush_period_in_ms (CASSANDRA-4237)
 + * replace supercolumns internally by composites (CASSANDRA-3237, 5123)
 + * upgrade thrift to 0.9.0 (CASSANDRA-3719)
 +
  1.2.2
+  * more robust solution to incomplete compactions + counters (CASSANDRA-5151)
   * fix symlinks under data dir not working (CASSANDRA-5185)
   * fix bug in compact storage metadata handling (CASSANDRA-5189)
   * Validate login for USE queries (CASSANDRA-5207)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f112f992/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f112f992/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f112f992/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 03db0df,0769d5c..7afaf96
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -41,10 -41,10 +41,11 @@@ import org.apache.cassandra.cache.IRowC
  import org.apache.cassandra.cache.RowCacheKey;
  import org.apache.cassandra.cache.RowCacheSentinel;
  import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
- import org.apache.cassandra.concurrent.NamedThreadFactory;
- import org.apache.cassandra.concurrent.StageManager;
- import org.apache.cassandra.config.*;
+ import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.CFMetaData.SpeculativeRetry;
+ import org.apache.cassandra.config.ColumnDefinition;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
  import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.db.commitlog.ReplayPosition;
@@@ -1366,7 -1409,7 +1407,6 @@@ public class ColumnFamilyStore implemen
      /**
        * Iterate over a range of rows and columns from memtables/sstables.
        *
--      * @param superColumn optional SuperColumn to slice subcolumns of; null 
to slice top-level columns
        * @param range Either a Bounds, which includes start key, or a Range, 
which does not.
        * @param columnFilter description of the columns we're interested in 
for each row
       */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f112f992/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SystemTable.java
index 8752eab,4f6234b..c1ede5b
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@@ -38,25 -39,19 +39,18 @@@ import org.apache.cassandra.config.Sche
  import org.apache.cassandra.cql3.QueryProcessor;
  import org.apache.cassandra.cql3.UntypedResultSet;
  import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+ import org.apache.cassandra.db.commitlog.ReplayPosition;
  import org.apache.cassandra.db.filter.QueryFilter;
- import org.apache.cassandra.db.marshal.AsciiType;
- import org.apache.cassandra.db.marshal.BytesType;
- import org.apache.cassandra.db.marshal.UTF8Type;
 -import org.apache.cassandra.db.filter.QueryPath;
+ import org.apache.cassandra.db.marshal.*;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.sstable.SSTableReader;
  import org.apache.cassandra.io.util.DataOutputBuffer;
- import org.apache.cassandra.io.util.FastByteArrayOutputStream;
  import org.apache.cassandra.locator.IEndpointSnitch;
  import org.apache.cassandra.service.StorageService;
 -import org.apache.cassandra.thrift.Constants;
 +import org.apache.cassandra.thrift.cassandraConstants;
- import org.apache.cassandra.utils.ByteBufferUtil;
- import org.apache.cassandra.utils.CounterId;
- import org.apache.cassandra.utils.FBUtilities;
- import org.apache.cassandra.utils.Pair;
+ import org.apache.cassandra.utils.*;
  
  import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
  
@@@ -186,6 -182,74 +181,74 @@@ public class SystemTabl
          }
      }
  
+     /**
+      * Write compaction log, except columfamilies under system keyspace.
+      *
+      * @param cfs
+      * @param toCompact sstables to compact
+      * @return compaction task id or null if cfs is under system keyspace
+      */
+     public static UUID startCompaction(ColumnFamilyStore cfs, 
Iterable<SSTableReader> toCompact)
+     {
 -        if (Table.SYSTEM_KS.equals(cfs.table.name))
++        if (Table.SYSTEM_KS.equals(cfs.table.getName()))
+             return null;
+ 
+         UUID compactionId = UUIDGen.getTimeUUID();
+         String req = "INSERT INTO system.%s (id, keyspace_name, 
columnfamily_name, inputs) VALUES (%s, '%s', '%s', {%s})";
+         Iterable<Integer> generations = Iterables.transform(toCompact, new 
Function<SSTableReader, Integer>()
+         {
+             public Integer apply(SSTableReader sstable)
+             {
+                 return sstable.descriptor.generation;
+             }
+         });
 -        processInternal(String.format(req, COMPACTION_LOG, compactionId, 
cfs.table.name, cfs.columnFamily, StringUtils.join(generations.iterator(), 
',')));
++        processInternal(String.format(req, COMPACTION_LOG, compactionId, 
cfs.table.getName(), cfs.name, StringUtils.join(generations.iterator(), ',')));
+         forceBlockingFlush(COMPACTION_LOG);
+         return compactionId;
+     }
+ 
+     public static void finishCompaction(UUID taskId)
+     {
+         assert taskId != null;
+ 
+         String req = "DELETE FROM system.%s WHERE id = %s";
+         processInternal(String.format(req, COMPACTION_LOG, taskId));
+         forceBlockingFlush(COMPACTION_LOG);
+     }
+ 
+     /**
+      * @return unfinished compactions, grouped by keyspace/columnfamily pair.
+      */
+     public static SetMultimap<Pair<String, String>, Integer> 
getUnfinishedCompactions()
+     {
+         String req = "SELECT * FROM system.%s";
+         UntypedResultSet resultSet = processInternal(String.format(req, 
COMPACTION_LOG));
+ 
+         SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = 
HashMultimap.create();
+         for (UntypedResultSet.Row row : resultSet)
+         {
+             String keyspace = row.getString("keyspace_name");
+             String columnfamily = row.getString("columnfamily_name");
+             Set<Integer> inputs = row.getSet("inputs", Int32Type.instance);
+ 
+             unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), 
inputs);
+         }
+         return unfinishedCompactions;
+     }
+ 
+     public static void discardCompactionsInProgress()
+     {
+         ColumnFamilyStore compactionLog = 
Table.open(Table.SYSTEM_KS).getColumnFamilyStore(COMPACTION_LOG);
+         try
+         {
+             compactionLog.truncate().get();
+         }
+         catch (Exception e)
+         {
+             throw new RuntimeException(e);
+         }
+     }
+ 
      public static void saveTruncationPosition(ColumnFamilyStore cfs, 
ReplayPosition position)
      {
          String req = "UPDATE system.%s SET truncated_at = truncated_at + %s 
WHERE key = '%s'";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f112f992/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 458fbc5,ca1e2da..9db0a1a
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -109,8 -110,10 +110,10 @@@ public class CompactionTask extends Abs
  
          // sanity check: all sstables must belong to the same cfs
          for (SSTableReader sstable : toCompact)
 -            assert sstable.descriptor.cfname.equals(cfs.columnFamily);
 +            assert sstable.descriptor.cfname.equals(cfs.name);
  
+         UUID taskId = SystemTable.startCompaction(cfs, toCompact);
+ 
          CompactionController controller = new CompactionController(cfs, 
toCompact, gcBefore);
          // new sstables from flush can be added during a compaction, but only 
the compaction can remove them,
          // so in our single-threaded compaction world this is a valid way of 
determining if we're compacting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f112f992/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f112f992/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index e77277f,8048fcb..e92c4b7
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@@ -43,42 -33,42 +33,31 @@@ import org.apache.commons.lang.ArrayUti
  import org.apache.commons.lang.StringUtils;
  import org.junit.Test;
  
--import static org.junit.Assert.assertNull;
--import static junit.framework.Assert.assertEquals;
--import static junit.framework.Assert.assertSame;
--import static junit.framework.Assert.assertTrue;
--import static org.apache.cassandra.Util.column;
--import static org.apache.cassandra.Util.dk;
--import static org.apache.cassandra.Util.getBytes;
--import static org.apache.cassandra.Util.rp;
--import static org.apache.cassandra.db.TableTest.assertColumns;
--import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
--import static org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY;
--
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
+ import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.ColumnDefinition;
- import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.config.Schema;
 -import org.apache.cassandra.exceptions.ConfigurationException;
  import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
  import org.apache.cassandra.db.filter.*;
  import org.apache.cassandra.db.index.SecondaryIndex;
  import org.apache.cassandra.db.marshal.CompositeType;
  import org.apache.cassandra.db.marshal.LexicalUUIDType;
  import org.apache.cassandra.db.marshal.LongType;
--import org.apache.cassandra.dht.Bounds;
--import org.apache.cassandra.dht.ExcludingBounds;
--import org.apache.cassandra.dht.IPartitioner;
--import org.apache.cassandra.dht.IncludingExcludingBounds;
--import org.apache.cassandra.dht.Range;
- import org.apache.cassandra.io.sstable.Component;
- import org.apache.cassandra.io.sstable.Descriptor;
- import org.apache.cassandra.io.sstable.SSTable;
- import org.apache.cassandra.io.sstable.SSTableReader;
++import org.apache.cassandra.dht.*;
++import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.sstable.*;
  import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.thrift.*;
  import org.apache.cassandra.utils.ByteBufferUtil;
 -import org.apache.cassandra.utils.UUIDGen;
  import org.apache.cassandra.utils.WrappedRunnable;
  
++import static junit.framework.Assert.*;
++import static org.apache.cassandra.Util.*;
++import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
++import static org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY;
++import static org.junit.Assert.assertNull;
++
  public class ColumnFamilyStoreTest extends SchemaLoader
  {
      static byte[] bytes1, bytes2;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f112f992/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------

Reply via email to