Author: jbellis
Date: Sun May 29 03:54:32 2011
New Revision: 1128814

URL: http://svn.apache.org/viewvc?rev=1128814&view=rev
Log:
merge from 0.7

Modified:
    cassandra/branches/cassandra-0.8/   (props changed)
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/contrib/   (props changed)
    
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
   (props changed)
    
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
   (props changed)
    
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
   (props changed)
    
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
   (props changed)
    
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
   (props changed)
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/CFMetaData.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Column.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CompactionManager.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/SuperColumn.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/Migration.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/ThriftValidation.java

Propchange: cassandra/branches/cassandra-0.8/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun May 29 03:54:32 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7:1026516-1127143
+/cassandra/branches/cassandra-0.7:1026516-1128347
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/branches/cassandra-0.8:1090934-1125013,1125041
 /cassandra/branches/cassandra-0.8.0:1125021-1127636

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1128814&r1=1128813&r2=1128814&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Sun May 29 03:54:32 2011
@@ -24,6 +24,9 @@
  * clone super columns to avoid modifying them during flush (CASSANDRA-2675)
  * allow writes to bypass the commitlog for certain keyspaces (CASSANDRA-2683)
  * avoid NPE when bypassing commitlog during memtable flush (CASSANDRA-2781)
+ * close scrub file handles (CASSANDRA-2669)
+ * throttle migration replay (CASSANDRA-2714)
+ * optimize column serializer creation (CASSANDRA-2716)
 
 
 0.8.0-final

Propchange: cassandra/branches/cassandra-0.8/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun May 29 03:54:32 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1127143
+/cassandra/branches/cassandra-0.7/contrib:1026516-1128347
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
 /cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125041
 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1127636

Propchange: 
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun May 29 03:54:32 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1127143
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1128347
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125041
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1127636

Propchange: 
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun May 29 03:54:32 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1127143
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1128347
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125041
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1127636

Propchange: 
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun May 29 03:54:32 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1127143
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1128347
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125041
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1127636

Propchange: 
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun May 29 03:54:32 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1127143
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1128347
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125041
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1127636

Propchange: 
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun May 29 03:54:32 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1127143
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1128347
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125041
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1127636

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/CFMetaData.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1128814&r1=1128813&r2=1128814&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/CFMetaData.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/CFMetaData.java
 Sun May 29 03:54:32 2011
@@ -257,7 +257,7 @@ public final class CFMetaData
 
     public static CFMetaData newIndexMetadata(CFMetaData parent, 
ColumnDefinition info, AbstractType columnComparator)
     {
-        return new CFMetaData(parent.ksName, indexName(parent.cfName, info), 
ColumnFamilyType.Standard, columnComparator, null)
+        return new CFMetaData(parent.ksName, parent.indexName(info), 
ColumnFamilyType.Standard, columnComparator, null)
                              .keyCacheSize(0.0)
                              .readRepairChance(0.0)
                              .gcGraceSeconds(parent.gcGraceSeconds)
@@ -306,9 +306,9 @@ public final class CFMetaData
     }
     
     /** convention for nameing secondary indexes. */
-    public static String indexName(String parentCf, ColumnDefinition info)
+    public String indexName(ColumnDefinition info)
     {
-        return parentCf + "." + (info.getIndexName() == null ? 
ByteBufferUtil.bytesToHex(info.name) : info.getIndexName());
+        return cfName + "." + (info.getIndexName() == null ? 
comparator.getString(info.name) + "_idx" : info.getIndexName());
     }
 
     public org.apache.cassandra.db.migration.avro.CfDef deflate()

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Column.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Column.java?rev=1128814&r1=1128813&r2=1128814&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Column.java 
(original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Column.java 
Sun May 29 03:54:32 2011
@@ -42,10 +42,11 @@ import org.apache.cassandra.utils.ByteBu
 public class Column implements IColumn
 {
     private static Logger logger = LoggerFactory.getLogger(Column.class);
+    private static ColumnSerializer serializer = new ColumnSerializer();
 
     public static ColumnSerializer serializer()
     {
-        return new ColumnSerializer();
+        return serializer;
     }
 
     protected final ByteBuffer name;

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1128814&r1=1128813&r2=1128814&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 Sun May 29 03:54:32 2011
@@ -511,7 +511,7 @@ public class ColumnFamilyStore implement
         if (cfm != null) // secondary indexes aren't stored in DD.
         {
             for (ColumnDefinition def : cfm.getColumn_metadata().values())
-                scrubDataDirectories(table, CFMetaData.indexName(cfm.cfName, 
def));
+                scrubDataDirectories(table, cfm.indexName(def));
         }
     }
 

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1128814&r1=1128813&r2=1128814&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CompactionManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CompactionManager.java
 Sun May 29 03:54:32 2011
@@ -633,26 +633,31 @@ public class CompactionManager implement
     private void doScrub(ColumnFamilyStore cfs, Collection<SSTableReader> 
sstables) throws IOException
     {
         assert !cfs.isIndex();
-
         for (final SSTableReader sstable : sstables)
-        {
-            logger.info("Scrubbing " + sstable);
-            CompactionController controller = new CompactionController(cfs, 
Collections.singletonList(sstable), getDefaultGcBefore(cfs), true);
+            scrubOne(cfs, sstable);
+    }
 
-            // Calculate the expected compacted filesize
-            String compactionFileLocation = 
cfs.table.getDataFileLocation(sstable.length());
-            if (compactionFileLocation == null)
-                throw new IOException("disk full");
-            int expectedBloomFilterSize = 
Math.max(DatabaseDescriptor.getIndexInterval(),
-                                                   
(int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
+    private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable) throws 
IOException
+    {
+        logger.info("Scrubbing " + sstable);
+        CompactionController controller = new CompactionController(cfs, 
Collections.singletonList(sstable), getDefaultGcBefore(cfs), true);
 
-            // loop through each row, deserializing to check for damage.
-            // we'll also loop through the index at the same time, using the 
position from the index to recover if the
-            // row header (key or data size) is corrupt. (This means our 
position in the index file will be one row
-            // "ahead" of the data file.)
-            final BufferedRandomAccessFile dataFile = 
BufferedRandomAccessFile.getUncachingReader(sstable.getFilename());
-            String indexFilename = 
sstable.descriptor.filenameFor(Component.PRIMARY_INDEX);
-            BufferedRandomAccessFile indexFile = 
BufferedRandomAccessFile.getUncachingReader(indexFilename);
+        // Calculate the expected compacted filesize
+        String compactionFileLocation = 
cfs.table.getDataFileLocation(sstable.length());
+        if (compactionFileLocation == null)
+            throw new IOException("disk full");
+        int expectedBloomFilterSize = 
Math.max(DatabaseDescriptor.getIndexInterval(),
+                                               
(int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
+
+        // loop through each row, deserializing to check for damage.
+        // we'll also loop through the index at the same time, using the 
position from the index to recover if the
+        // row header (key or data size) is corrupt. (This means our position 
in the index file will be one row
+        // "ahead" of the data file.)
+        final BufferedRandomAccessFile dataFile = 
BufferedRandomAccessFile.getUncachingReader(sstable.getFilename());
+        String indexFilename = 
sstable.descriptor.filenameFor(Component.PRIMARY_INDEX);
+        BufferedRandomAccessFile indexFile = 
BufferedRandomAccessFile.getUncachingReader(indexFilename);
+        try
+        {
             ByteBuffer nextIndexKey = 
ByteBufferUtil.readWithShortLength(indexFile);
             {
                 // throw away variable so we don't have a side effect in the 
assert
@@ -791,6 +796,11 @@ public class CompactionManager implement
                     logger.info("Scrub of " + sstable + " complete; looks like 
all " + emptyRows + " rows were tombstoned");
             }
         }
+        finally
+        {
+            FileUtils.closeQuietly(dataFile);
+            FileUtils.closeQuietly(indexFile);
+        }
     }
 
     private void throwIfFatal(Throwable th)

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/SuperColumn.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1128814&r1=1128813&r2=1128814&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/SuperColumn.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/SuperColumn.java
 Sun May 29 03:54:32 2011
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.Collection;
+import java.util.Comparator;
+import java.util.IdentityHashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -37,13 +39,21 @@ import org.apache.cassandra.io.util.Colu
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 
 public class SuperColumn implements IColumn, IColumnContainer
 {
+    private static NonBlockingHashMap<Comparator, SuperColumnSerializer> 
serializers = new NonBlockingHashMap<Comparator, SuperColumnSerializer>();
     public static SuperColumnSerializer serializer(AbstractType comparator)
     {
-        return new SuperColumnSerializer(comparator);
+        SuperColumnSerializer serializer = serializers.get(comparator);
+        if (serializer == null)
+        {
+            serializer = new SuperColumnSerializer(comparator);
+            serializers.put(comparator, serializer);
+        }
+        return serializer;
     }
 
     private ByteBuffer name_;

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/Migration.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1128814&r1=1128813&r2=1128814&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/Migration.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/Migration.java
 Sun May 29 03:54:32 2011
@@ -299,7 +299,12 @@ public abstract class Migration
         DecoratedKey dkey = 
StorageService.getPartitioner().decorateKey(MIGRATIONS_KEY);
         Table defs = Table.open(Table.SYSTEM_TABLE);
         ColumnFamilyStore cfStore = 
defs.getColumnFamilyStore(Migration.MIGRATIONS_CF);
-        QueryFilter filter = QueryFilter.getSliceFilter(dkey, new 
QueryPath(MIGRATIONS_CF), ByteBuffer.wrap(UUIDGen.decompose(start)), 
ByteBuffer.wrap(UUIDGen.decompose(end)), false, 1000);   
+        QueryFilter filter = QueryFilter.getSliceFilter(dkey,
+                                                        new 
QueryPath(MIGRATIONS_CF),
+                                                        
ByteBuffer.wrap(UUIDGen.decompose(start)),
+                                                        
ByteBuffer.wrap(UUIDGen.decompose(end)),
+                                                        false,
+                                                        100);
         ColumnFamily cf = cfStore.getColumnFamily(filter);
         return cf.getSortedColumns();
     }

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1128814&r1=1128813&r2=1128814&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java
 Sun May 29 03:54:32 2011
@@ -24,10 +24,10 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.cassandra.net.CachingMessageProducer;
-import org.apache.cassandra.net.MessageProducer;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.MapMaker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,16 +37,23 @@ import org.apache.cassandra.config.Confi
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.db.migration.Migration;
 import org.apache.cassandra.gms.*;
+import org.apache.cassandra.net.CachingMessageProducer;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
 public class MigrationManager implements IEndpointStateChangeSubscriber
 {
     private static final Logger logger = 
LoggerFactory.getLogger(MigrationManager.class);
-    
+
+    // avoids re-pushing migrations that we're waiting on target to apply 
already
+    private static Map<InetAddress,UUID> lastPushed = new 
MapMaker().expiration(1, TimeUnit.MINUTES).makeMap();
+
     /** I'm not going to act here. */
     public void onJoin(InetAddress endpoint, EndpointState epState) { }
 
@@ -89,8 +96,16 @@ public class MigrationManager implements
         }
         else if (!StorageService.instance.isClientMode())
         {
-            logger.debug("Their data definitions are old. Sending updates 
since {}", theirVersion.toString());
-            pushMigrations(theirVersion, myVersion, endpoint);
+            if (lastPushed.get(endpoint) == null || theirVersion.timestamp() 
>= lastPushed.get(endpoint).timestamp())
+            {
+                logger.debug("Schema on {} is old. Sending updates since {}", 
endpoint, theirVersion);
+                pushMigrations(theirVersion, myVersion, endpoint);
+            }
+            else
+            {
+                logger.debug("Waiting for {} to process migrations up to {} 
before sending more",
+                             endpoint, lastPushed.get(endpoint));
+            }
         }
     }
 
@@ -192,6 +207,7 @@ public class MigrationManager implements
         {
             Message msg = makeMigrationMessage(migrations, 
Gossiper.instance.getVersion(host));
             MessagingService.instance().sendOneWay(msg, host);
+            lastPushed.put(host, 
TimeUUIDType.instance.compose(Iterables.getLast(migrations).name()));
         }
         catch (IOException ex)
         {

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/ThriftValidation.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/ThriftValidation.java?rev=1128814&r1=1128813&r2=1128814&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/ThriftValidation.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/ThriftValidation.java
 Sun May 29 03:54:32 2011
@@ -554,7 +554,8 @@ public class ThriftValidation
             for (ColumnDef c : cf_def.column_metadata)
             {
                 // Ensure that given idx_names and auto_generated idx_names 
cannot collide
-                String idxName = CFMetaData.indexName(cf_def.name, 
ColumnDefinition.fromColumnDef(c));
+                CFMetaData cfm = CFMetaData.fromThrift(cf_def);
+                String idxName = 
cfm.indexName(ColumnDefinition.fromColumnDef(c));
                 if (indexNames.contains(idxName))
                     throw new InvalidRequestException("Duplicate index names " 
+ idxName);
                 indexNames.add(idxName);


Reply via email to