Author: jbellis
Date: Tue Feb  8 15:51:44 2011
New Revision: 1068454

URL: http://svn.apache.org/viewvc?rev=1068454&view=rev
Log:
merge from 0.7

Added:
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java
      - copied unchanged from r1068027, 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/CacheWriter.java
Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb  8 15:51:44 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843
-/cassandra/branches/cassandra-0.7:1026516-1067497,1068028
+/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1068009
+/cassandra/branches/cassandra-0.7:1026516-1068028
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Feb  8 15:51:44 2011
@@ -9,6 +9,11 @@
  * backwards compatible internal messaging (CASSANDRA-1015)
 
 
+0.7.2
+ * cache writing moved to CompactionManager to reduce i/o contention and
+   updated to use non-cache-polluting writes (CASSANDRA-2053)
+
+
 0.7.1
  * buffer network stack to avoid inefficient small TCP messages while avoiding
    the nagle/delayed ack problem (CASSANDRA-1896)

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb  8 15:51:44 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1067497,1068028
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1068009
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1068028
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb  8 15:51:44 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1067497,1068028
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1068009
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1068028
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb  8 15:51:44 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1067497,1068028
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1068009
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1068028
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb  8 15:51:44 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1067497,1068028
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1068009
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1068028
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb  8 15:51:44 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1067497,1068028
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1068009
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1068028
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Tue 
Feb  8 15:51:44 2011
@@ -51,7 +51,7 @@ public final class CFMetaData
     public final static double DEFAULT_READ_REPAIR_CHANCE = 1.0;
     public final static boolean DEFAULT_REPLICATE_ON_WRITE = false;
     public final static int DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS = 0;
-    public final static int DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS = 3600;
+    public final static int DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS = 4 * 
3600;
     public final static int DEFAULT_GC_GRACE_SECONDS = 864000;
     public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4;
     public final static int DEFAULT_MAX_COMPACTION_THRESHOLD = 32;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue 
Feb  8 15:51:44 2011
@@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.concurrent.RetryingScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
@@ -61,9 +60,6 @@ public class ColumnFamilyStore implement
 {
     private static Logger logger = 
LoggerFactory.getLogger(ColumnFamilyStore.class);
 
-    private static final ScheduledThreadPoolExecutor cacheSavingExecutor =
-            new RetryingScheduledThreadPoolExecutor("CACHE-SAVER", 
Thread.MIN_PRIORITY);
-
     /*
      * submitFlush first puts [Binary]Memtable.getSortedContents on the 
flushSorter executor,
      * which then puts the sorted results on the writer executor.  This is 
because sorting is CPU-bound,
@@ -137,22 +133,6 @@ public class ColumnFamilyStore implement
     private volatile DefaultInteger memsize;
     private volatile DefaultDouble memops;
 
-    private final Runnable rowCacheSaverTask = new WrappedRunnable()
-    {
-        protected void runMayThrow() throws IOException
-        {
-            ssTables.saveRowCache();
-        }
-    };
-
-    private final Runnable keyCacheSaverTask = new WrappedRunnable()
-    {
-        protected void runMayThrow() throws Exception
-        {
-            ssTables.saveKeyCache();
-        }
-    };
-    
     public void reload()
     {
         // metadata object has been mutated directly. make all the members 
jibe with new settings.
@@ -573,29 +553,43 @@ public class ColumnFamilyStore implement
                                       columnFamily));
         if (rowCacheSavePeriodInSeconds > 0)
         {
-            cacheSavingExecutor.scheduleWithFixedDelay(rowCacheSaverTask,
-                                                       
rowCacheSavePeriodInSeconds,
-                                                       
rowCacheSavePeriodInSeconds,
-                                                       TimeUnit.SECONDS);
+            Runnable runnable = new WrappedRunnable()
+            {
+                public void runMayThrow()
+                {
+                    submitRowCacheWrite();
+                }
+            };
+            StorageService.scheduledTasks.scheduleWithFixedDelay(runnable,
+                                                                 
rowCacheSavePeriodInSeconds,
+                                                                 
rowCacheSavePeriodInSeconds,
+                                                                 
TimeUnit.SECONDS);
         }
 
         if (keyCacheSavePeriodInSeconds > 0)
         {
-            cacheSavingExecutor.scheduleWithFixedDelay(keyCacheSaverTask,
-                                                       
keyCacheSavePeriodInSeconds,
-                                                       
keyCacheSavePeriodInSeconds,
-                                                       TimeUnit.SECONDS);
+            Runnable runnable = new WrappedRunnable()
+            {
+                public void runMayThrow()
+                {
+                    submitKeyCacheWrite();
+                }
+            };
+            StorageService.scheduledTasks.scheduleWithFixedDelay(runnable,
+                                                                 
keyCacheSavePeriodInSeconds,
+                                                                 
keyCacheSavePeriodInSeconds,
+                                                                 
TimeUnit.SECONDS);
         }
     }
 
     public Future<?> submitRowCacheWrite()
     {
-        return cacheSavingExecutor.submit(rowCacheSaverTask);
+        return 
CompactionManager.instance.submitCacheWrite(ssTables.getRowCacheWriter());
     }
 
     public Future<?> submitKeyCacheWrite()
     {
-        return cacheSavingExecutor.submit(keyCacheSaverTask);
+        return 
CompactionManager.instance.submitCacheWrite(ssTables.getKeyCacheWriter());
     }
 
     /**

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Tue 
Feb  8 15:51:44 2011
@@ -50,6 +50,7 @@ import org.apache.cassandra.service.Anti
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.OperationType;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.WrappedRunnable;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 public class CompactionManager implements CompactionManagerMBean
@@ -76,7 +77,7 @@ public class CompactionManager implement
 
     private CompactionExecutor executor = new CompactionExecutor();
     private Map<ColumnFamilyStore, Integer> estimatedCompactions = new 
NonBlockingHashMap<ColumnFamilyStore, Integer>();
-    
+
     public Lock getCompactionLock()
     {
         return compactionLock;
@@ -407,7 +408,7 @@ public class CompactionManager implement
         SSTableWriter writer;
         CompactionIterator ci = new CompactionIterator(cfs, sstables, 
gcBefore, major); // retain a handle so we can call close()
         Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, 
PredicateUtils.notNullPredicate());
-        executor.beginCompaction(cfs, ci);
+        executor.beginCompaction(cfs.columnFamily, ci);
 
         Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
 
@@ -503,7 +504,7 @@ public class CompactionManager implement
             SSTableWriter writer = null;
             SSTableScanner scanner = 
sstable.getDirectScanner(CompactionIterator.FILE_BUFFER_SIZE);
             SortedSet<ByteBuffer> indexedColumns = cfs.getIndexedColumns();
-            executor.beginCompaction(cfs, new CleanupInfo(sstable, scanner));
+            executor.beginCompaction(cfs.columnFamily, new 
CleanupInfo(sstable, scanner));
             try
             {
                 while (scanner.hasNext())
@@ -598,7 +599,7 @@ public class CompactionManager implement
         }
 
         CompactionIterator ci = new ValidationCompactionIterator(cfs);
-        executor.beginCompaction(cfs, ci);
+        executor.beginCompaction(cfs.columnFamily, ci);
         try
         {
             Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, 
PredicateUtils.notNullPredicate());
@@ -693,7 +694,7 @@ public class CompactionManager implement
                 {
                     if (cfs.isInvalid())
                         return;
-                    executor.beginCompaction(cfs, builder);
+                    executor.beginCompaction(cfs.columnFamily, builder);
                     builder.build();
                 }
                 finally
@@ -724,7 +725,7 @@ public class CompactionManager implement
                 compactionLock.lock();
                 try
                 {
-                    executor.beginCompaction(builder.cfs, builder);
+                    executor.beginCompaction(builder.cfs.columnFamily, 
builder);
                     return builder.build();
                 }
                 finally
@@ -736,6 +737,19 @@ public class CompactionManager implement
         return executor.submit(callable);
     }
 
+    public Future<?> submitCacheWrite(final CacheWriter writer)
+    {
+        Runnable runnable = new WrappedRunnable()
+        {
+            public void runMayThrow() throws IOException
+            {
+                executor.beginCompaction(writer.getColumnFamily(), writer);
+                writer.saveCache();
+            }
+        };
+        return executor.submit(runnable);
+    }
+
     private static class ValidationCompactionIterator extends 
CompactionIterator
     {
         public ValidationCompactionIterator(ColumnFamilyStore cfs) throws 
IOException
@@ -777,7 +791,7 @@ public class CompactionManager implement
 
     private static class CompactionExecutor extends 
DebuggableThreadPoolExecutor
     {
-        private volatile ColumnFamilyStore cfs;
+        private volatile String columnFamily;
         private volatile ICompactionInfo ci;
 
         public CompactionExecutor()
@@ -789,19 +803,19 @@ public class CompactionManager implement
         public void afterExecute(Runnable r, Throwable t)
         {
             super.afterExecute(r, t);
-            cfs = null;
+            columnFamily = null;
             ci = null;
         }
 
-        void beginCompaction(ColumnFamilyStore cfs, ICompactionInfo ci)
+        void beginCompaction(String columnFamily, ICompactionInfo ci)
         {
-            this.cfs = cfs;
+            this.columnFamily = columnFamily;
             this.ci = ci;
         }
 
         public String getColumnFamilyName()
         {
-            return cfs == null ? null : cfs.getColumnFamilyName();
+            return columnFamily == null ? null : columnFamily;
         }
 
         public Long getBytesTotal()
@@ -811,7 +825,7 @@ public class CompactionManager implement
 
         public Long getBytesCompleted()
         {
-            return ci == null ? null : ci.getBytesRead();
+            return ci == null ? null : ci.getBytesComplete();
         }
 
         public String getType()
@@ -939,7 +953,7 @@ public class CompactionManager implement
             return scanner.getFileLength();
         }
 
-        public long getBytesRead()
+        public long getBytesComplete()
         {
             return scanner.getFilePointer();
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Tue Feb  8 
15:51:44 2011
@@ -653,7 +653,7 @@ public class Table
             return iter.getTotalBytes();
         }
 
-        public long getBytesRead()
+        public long getBytesComplete()
         {
             return iter.getBytesRead();
         }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java 
Tue Feb  8 15:51:44 2011
@@ -157,7 +157,7 @@ implements Closeable, ICompactionInfo
         return totalBytes;
     }
 
-    public long getBytesRead()
+    public long getBytesComplete()
     {
         return bytesRead;
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java Tue 
Feb  8 15:51:44 2011
@@ -25,7 +25,7 @@ public interface ICompactionInfo
 {
     public long getTotalBytes();
 
-    public long getBytesRead();
+    public long getBytesComplete();
 
     public String getTaskType();
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java 
Tue Feb  8 15:51:44 2011
@@ -19,15 +19,12 @@
 
 package org.apache.cassandra.io.sstable;
 
-import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicLong;
 
-import javax.swing.plaf.basic.BasicButtonListener;
-
 import com.google.common.base.Function;
-import org.apache.cassandra.utils.ByteBufferUtil;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,7 +32,6 @@ import org.apache.cassandra.cache.JMXIns
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.Pair;
 
 public class SSTableTracker implements Iterable<SSTableReader>
@@ -61,45 +57,7 @@ public class SSTableTracker implements I
         rowCache = new JMXInstrumentedCache<DecoratedKey, 
ColumnFamily>(ksname, cfname + "RowCache", 3);
     }
 
-    protected class CacheWriter<K, V>
-    {
-        public void saveCache(JMXInstrumentedCache<K, V> cache, File 
savedCachePath, Function<K, ByteBuffer> converter) throws IOException
-        {
-            long start = System.currentTimeMillis();
-            String msgSuffix = savedCachePath.getName() + " for " + cfname + " 
of " + ksname;
-            logger.info("saving " + msgSuffix);
-            int count = 0;
-            File tmpFile = File.createTempFile(savedCachePath.getName(), null, 
savedCachePath.getParentFile());
-            
-            FileOutputStream fout = null;
-            ObjectOutputStream out = null;
-            try
-            {
-                fout = new FileOutputStream(tmpFile);
-                out = new ObjectOutputStream(new BufferedOutputStream(fout));
-                FileDescriptor fd = fout.getFD();
-                for (K key : cache.getKeySet())
-                {
-                    ByteBuffer bytes = converter.apply(key);
-                    ByteBufferUtil.writeWithLength(bytes, out);
-                    ++count;
-                }
-                out.flush();
-                fd.sync();
-            }
-            finally
-            {
-                FileUtils.closeQuietly(out);
-                FileUtils.closeQuietly(fout);
-            }
-            if (!tmpFile.renameTo(savedCachePath))
-                throw new IOException("Unable to rename cache to " + 
savedCachePath);
-            if (logger.isDebugEnabled())
-                logger.debug("saved " + count + " keys in " + 
(System.currentTimeMillis() - start) + " ms from " + msgSuffix);
-        }
-    }
-
-    public void saveKeyCache() throws IOException
+    public CacheWriter<Pair<Descriptor, DecoratedKey>, Long> 
getKeyCacheWriter()
     {
         Function<Pair<Descriptor, DecoratedKey>, ByteBuffer> function = new 
Function<Pair<Descriptor, DecoratedKey>, ByteBuffer>()
         {
@@ -108,11 +66,10 @@ public class SSTableTracker implements I
                 return key.right.key;
             }
         };
-        CacheWriter<Pair<Descriptor, DecoratedKey>, Long> writer = new 
CacheWriter<Pair<Descriptor, DecoratedKey>, Long>();
-        writer.saveCache(keyCache, 
DatabaseDescriptor.getSerializedKeyCachePath(ksname, cfname), function);
+        return new CacheWriter<Pair<Descriptor, DecoratedKey>, Long>(cfname, 
keyCache, DatabaseDescriptor.getSerializedKeyCachePath(ksname, cfname), 
function);
     }
 
-    public void saveRowCache() throws IOException
+    public CacheWriter<DecoratedKey, ColumnFamily> getRowCacheWriter()
     {
         Function<DecoratedKey, ByteBuffer> function = new 
Function<DecoratedKey, ByteBuffer>()
         {
@@ -121,8 +78,7 @@ public class SSTableTracker implements I
                 return key.key;
             }
         };
-        CacheWriter<DecoratedKey, ColumnFamily> writer = new 
CacheWriter<DecoratedKey, ColumnFamily>();
-        writer.saveCache(rowCache, 
DatabaseDescriptor.getSerializedRowCachePath(ksname, cfname), function);
+        return new CacheWriter<DecoratedKey, ColumnFamily>(cfname, rowCache, 
DatabaseDescriptor.getSerializedRowCachePath(ksname, cfname), function);
     }
 
     public synchronized void replace(Collection<SSTableReader> oldSSTables, 
Iterable<SSTableReader> replacements)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
Tue Feb  8 15:51:44 2011
@@ -289,7 +289,7 @@ public class SSTableWriter extends SSTab
             }
         }
 
-        public long getBytesRead()
+        public long getBytesComplete()
         {
             return indexer.dfile.getFilePointer();
         }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Tue 
Feb  8 15:51:44 2011
@@ -277,7 +277,8 @@ public final class MessagingService impl
 
     private void addCallback(IMessageCallback cb, String messageId, 
InetAddress to)
     {
-        callbacks.put(messageId, new Pair<InetAddress, IMessageCallback>(to, 
cb));
+        Pair<InetAddress, IMessageCallback> previous = 
callbacks.put(messageId, new Pair<InetAddress, IMessageCallback>(to, cb));
+        assert previous == null;
     }
     
     /**

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue 
Feb  8 15:51:44 2011
@@ -220,7 +220,7 @@ public class StorageProxy implements Sto
                 if (destination.equals(FBUtilities.getLocalAddress()))
                 {
                     if (insertLocalMessages)
-                        insertLocalMessage(rm, responseHandler);
+                        insertLocal(rm, responseHandler);
                 }
                 else
                 {
@@ -241,7 +241,8 @@ public class StorageProxy implements Sto
             }
             else
             {
-                // hinted
+                // hinted messages are unique, so there is no point to adding 
a hop by forwarding via another node.
+                // thus, we use sendRR/sendOneWay directly here.
                 Message hintedMessage = 
rm.getMessage(Gossiper.instance.getVersion(destination));
                 for (InetAddress target : targets)
                 {
@@ -249,24 +250,14 @@ public class StorageProxy implements Sto
                     {
                         addHintHeader(hintedMessage, target);
                         if (logger.isDebugEnabled())
-                            logger.debug("insert writing key " + 
ByteBufferUtil.bytesToHex(rm.key()) + " to " + hintedMessage.getMessageId() + 
"@" + destination + " for " + target);
+                            logger.debug("insert writing key " + 
ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination + " for " + target);
                     }
                 }
-                // (non-destination hints are part of the callback and count 
towards consistency only under CL.ANY)
+                // non-destination hints are part of the callback and count 
towards consistency only under CL.ANY
                 if (targets.contains(destination) || consistency_level == 
ConsistencyLevel.ANY)
                     MessagingService.instance().sendRR(hintedMessage, 
destination, responseHandler);
                 else
                     MessagingService.instance().sendOneWay(hintedMessage, 
destination);
-
-                Multimap<Message, InetAddress> messages = dcMessages.get(dc);
-
-                if (messages == null)
-                {
-                   messages = HashMultimap.create();
-                   dcMessages.put(dc, messages);
-                }
-
-                messages.put(hintedMessage, destination);
             }
 
             sendMessages(localDataCenter, dcMessages, responseHandler);
@@ -335,7 +326,7 @@ public class StorageProxy implements Sto
         message.setHeader(RowMutation.HINT, bos.toByteArray());
     }
 
-    private static void insertLocalMessage(final RowMutation rm, final 
IWriteResponseHandler responseHandler)
+    private static void insertLocal(final RowMutation rm, final 
IWriteResponseHandler responseHandler)
     {
         if (logger.isDebugEnabled())
             logger.debug("insert writing local " + rm.toString(true));

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java Tue 
Feb  8 15:51:44 2011
@@ -107,9 +107,10 @@ public class ExpiringMap<K, V>
         timer.cancel();
     }
 
-    public void put(K key, V value)
+    public V put(K key, V value)
     {
-        cache.put(key, new CacheableObject<V>(value));
+        CacheableObject<V> previous = cache.put(key, new 
CacheableObject<V>(value));
+        return (previous == null) ? null : previous.getValue();
     }
 
     public V get(K key)


Reply via email to