Author: jbellis
Date: Tue Feb 8 15:51:44 2011
New Revision: 1068454
URL: http://svn.apache.org/viewvc?rev=1068454&view=rev
Log:
merge from 0.7
Added:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java
- copied unchanged from r1068027,
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/CacheWriter.java
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 8 15:51:44 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843
-/cassandra/branches/cassandra-0.7:1026516-1067497,1068028
+/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1068009
+/cassandra/branches/cassandra-0.7:1026516-1068028
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Feb 8 15:51:44 2011
@@ -9,6 +9,11 @@
* backwards compatible internal messaging (CASSANDRA-1015)
+0.7.2
+ * cache writing moved to CompactionManager to reduce i/o contention and
+ updated to use non-cache-polluting writes (CASSANDRA-2053)
+
+
0.7.1
* buffer network stack to avoid inefficient small TCP messages while avoiding
the nagle/delayed ack problem (CASSANDRA-1896)
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 8 15:51:44 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1067497,1068028
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1068009
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1068028
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 8 15:51:44 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1067497,1068028
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1068009
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1068028
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 8 15:51:44 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1067497,1068028
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1068009
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1068028
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 8 15:51:44 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1067497,1068028
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1068009
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1068028
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 8 15:51:44 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1067497,1068028
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1068009
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1068028
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Tue
Feb 8 15:51:44 2011
@@ -51,7 +51,7 @@ public final class CFMetaData
public final static double DEFAULT_READ_REPAIR_CHANCE = 1.0;
public final static boolean DEFAULT_REPLICATE_ON_WRITE = false;
public final static int DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS = 0;
- public final static int DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS = 3600;
+ public final static int DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS = 4 *
3600;
public final static int DEFAULT_GC_GRACE_SECONDS = 864000;
public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4;
public final static int DEFAULT_MAX_COMPACTION_THRESHOLD = 32;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue
Feb 8 15:51:44 2011
@@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.concurrent.RetryingScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
@@ -61,9 +60,6 @@ public class ColumnFamilyStore implement
{
private static Logger logger =
LoggerFactory.getLogger(ColumnFamilyStore.class);
- private static final ScheduledThreadPoolExecutor cacheSavingExecutor =
- new RetryingScheduledThreadPoolExecutor("CACHE-SAVER",
Thread.MIN_PRIORITY);
-
/*
* submitFlush first puts [Binary]Memtable.getSortedContents on the
flushSorter executor,
* which then puts the sorted results on the writer executor. This is
because sorting is CPU-bound,
@@ -137,22 +133,6 @@ public class ColumnFamilyStore implement
private volatile DefaultInteger memsize;
private volatile DefaultDouble memops;
- private final Runnable rowCacheSaverTask = new WrappedRunnable()
- {
- protected void runMayThrow() throws IOException
- {
- ssTables.saveRowCache();
- }
- };
-
- private final Runnable keyCacheSaverTask = new WrappedRunnable()
- {
- protected void runMayThrow() throws Exception
- {
- ssTables.saveKeyCache();
- }
- };
-
public void reload()
{
// metadata object has been mutated directly. make all the members
jibe with new settings.
@@ -573,29 +553,43 @@ public class ColumnFamilyStore implement
columnFamily));
if (rowCacheSavePeriodInSeconds > 0)
{
- cacheSavingExecutor.scheduleWithFixedDelay(rowCacheSaverTask,
-
rowCacheSavePeriodInSeconds,
-
rowCacheSavePeriodInSeconds,
- TimeUnit.SECONDS);
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow()
+ {
+ submitRowCacheWrite();
+ }
+ };
+ StorageService.scheduledTasks.scheduleWithFixedDelay(runnable,
+
rowCacheSavePeriodInSeconds,
+
rowCacheSavePeriodInSeconds,
+
TimeUnit.SECONDS);
}
if (keyCacheSavePeriodInSeconds > 0)
{
- cacheSavingExecutor.scheduleWithFixedDelay(keyCacheSaverTask,
-
keyCacheSavePeriodInSeconds,
-
keyCacheSavePeriodInSeconds,
- TimeUnit.SECONDS);
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow()
+ {
+ submitKeyCacheWrite();
+ }
+ };
+ StorageService.scheduledTasks.scheduleWithFixedDelay(runnable,
+
keyCacheSavePeriodInSeconds,
+
keyCacheSavePeriodInSeconds,
+
TimeUnit.SECONDS);
}
}
public Future<?> submitRowCacheWrite()
{
- return cacheSavingExecutor.submit(rowCacheSaverTask);
+ return
CompactionManager.instance.submitCacheWrite(ssTables.getRowCacheWriter());
}
public Future<?> submitKeyCacheWrite()
{
- return cacheSavingExecutor.submit(keyCacheSaverTask);
+ return
CompactionManager.instance.submitCacheWrite(ssTables.getKeyCacheWriter());
}
/**
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Tue
Feb 8 15:51:44 2011
@@ -50,6 +50,7 @@ import org.apache.cassandra.service.Anti
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.OperationType;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.WrappedRunnable;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
public class CompactionManager implements CompactionManagerMBean
@@ -76,7 +77,7 @@ public class CompactionManager implement
private CompactionExecutor executor = new CompactionExecutor();
private Map<ColumnFamilyStore, Integer> estimatedCompactions = new
NonBlockingHashMap<ColumnFamilyStore, Integer>();
-
+
public Lock getCompactionLock()
{
return compactionLock;
@@ -407,7 +408,7 @@ public class CompactionManager implement
SSTableWriter writer;
CompactionIterator ci = new CompactionIterator(cfs, sstables,
gcBefore, major); // retain a handle so we can call close()
Iterator<AbstractCompactedRow> nni = new FilterIterator(ci,
PredicateUtils.notNullPredicate());
- executor.beginCompaction(cfs, ci);
+ executor.beginCompaction(cfs.columnFamily, ci);
Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
@@ -503,7 +504,7 @@ public class CompactionManager implement
SSTableWriter writer = null;
SSTableScanner scanner =
sstable.getDirectScanner(CompactionIterator.FILE_BUFFER_SIZE);
SortedSet<ByteBuffer> indexedColumns = cfs.getIndexedColumns();
- executor.beginCompaction(cfs, new CleanupInfo(sstable, scanner));
+ executor.beginCompaction(cfs.columnFamily, new
CleanupInfo(sstable, scanner));
try
{
while (scanner.hasNext())
@@ -598,7 +599,7 @@ public class CompactionManager implement
}
CompactionIterator ci = new ValidationCompactionIterator(cfs);
- executor.beginCompaction(cfs, ci);
+ executor.beginCompaction(cfs.columnFamily, ci);
try
{
Iterator<AbstractCompactedRow> nni = new FilterIterator(ci,
PredicateUtils.notNullPredicate());
@@ -693,7 +694,7 @@ public class CompactionManager implement
{
if (cfs.isInvalid())
return;
- executor.beginCompaction(cfs, builder);
+ executor.beginCompaction(cfs.columnFamily, builder);
builder.build();
}
finally
@@ -724,7 +725,7 @@ public class CompactionManager implement
compactionLock.lock();
try
{
- executor.beginCompaction(builder.cfs, builder);
+ executor.beginCompaction(builder.cfs.columnFamily,
builder);
return builder.build();
}
finally
@@ -736,6 +737,19 @@ public class CompactionManager implement
return executor.submit(callable);
}
+ public Future<?> submitCacheWrite(final CacheWriter writer)
+ {
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws IOException
+ {
+ executor.beginCompaction(writer.getColumnFamily(), writer);
+ writer.saveCache();
+ }
+ };
+ return executor.submit(runnable);
+ }
+
private static class ValidationCompactionIterator extends
CompactionIterator
{
public ValidationCompactionIterator(ColumnFamilyStore cfs) throws
IOException
@@ -777,7 +791,7 @@ public class CompactionManager implement
private static class CompactionExecutor extends
DebuggableThreadPoolExecutor
{
- private volatile ColumnFamilyStore cfs;
+ private volatile String columnFamily;
private volatile ICompactionInfo ci;
public CompactionExecutor()
@@ -789,19 +803,19 @@ public class CompactionManager implement
public void afterExecute(Runnable r, Throwable t)
{
super.afterExecute(r, t);
- cfs = null;
+ columnFamily = null;
ci = null;
}
- void beginCompaction(ColumnFamilyStore cfs, ICompactionInfo ci)
+ void beginCompaction(String columnFamily, ICompactionInfo ci)
{
- this.cfs = cfs;
+ this.columnFamily = columnFamily;
this.ci = ci;
}
public String getColumnFamilyName()
{
- return cfs == null ? null : cfs.getColumnFamilyName();
+ return columnFamily == null ? null : columnFamily;
}
public Long getBytesTotal()
@@ -811,7 +825,7 @@ public class CompactionManager implement
public Long getBytesCompleted()
{
- return ci == null ? null : ci.getBytesRead();
+ return ci == null ? null : ci.getBytesComplete();
}
public String getType()
@@ -939,7 +953,7 @@ public class CompactionManager implement
return scanner.getFileLength();
}
- public long getBytesRead()
+ public long getBytesComplete()
{
return scanner.getFilePointer();
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Tue Feb 8
15:51:44 2011
@@ -653,7 +653,7 @@ public class Table
return iter.getTotalBytes();
}
- public long getBytesRead()
+ public long getBytesComplete()
{
return iter.getBytesRead();
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
Tue Feb 8 15:51:44 2011
@@ -157,7 +157,7 @@ implements Closeable, ICompactionInfo
return totalBytes;
}
- public long getBytesRead()
+ public long getBytesComplete()
{
return bytesRead;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java Tue
Feb 8 15:51:44 2011
@@ -25,7 +25,7 @@ public interface ICompactionInfo
{
public long getTotalBytes();
- public long getBytesRead();
+ public long getBytesComplete();
public String getTaskType();
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
Tue Feb 8 15:51:44 2011
@@ -19,15 +19,12 @@
package org.apache.cassandra.io.sstable;
-import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
-import javax.swing.plaf.basic.BasicButtonListener;
-
import com.google.common.base.Function;
-import org.apache.cassandra.utils.ByteBufferUtil;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +32,6 @@ import org.apache.cassandra.cache.JMXIns
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.Pair;
public class SSTableTracker implements Iterable<SSTableReader>
@@ -61,45 +57,7 @@ public class SSTableTracker implements I
rowCache = new JMXInstrumentedCache<DecoratedKey,
ColumnFamily>(ksname, cfname + "RowCache", 3);
}
- protected class CacheWriter<K, V>
- {
- public void saveCache(JMXInstrumentedCache<K, V> cache, File
savedCachePath, Function<K, ByteBuffer> converter) throws IOException
- {
- long start = System.currentTimeMillis();
- String msgSuffix = savedCachePath.getName() + " for " + cfname + "
of " + ksname;
- logger.info("saving " + msgSuffix);
- int count = 0;
- File tmpFile = File.createTempFile(savedCachePath.getName(), null,
savedCachePath.getParentFile());
-
- FileOutputStream fout = null;
- ObjectOutputStream out = null;
- try
- {
- fout = new FileOutputStream(tmpFile);
- out = new ObjectOutputStream(new BufferedOutputStream(fout));
- FileDescriptor fd = fout.getFD();
- for (K key : cache.getKeySet())
- {
- ByteBuffer bytes = converter.apply(key);
- ByteBufferUtil.writeWithLength(bytes, out);
- ++count;
- }
- out.flush();
- fd.sync();
- }
- finally
- {
- FileUtils.closeQuietly(out);
- FileUtils.closeQuietly(fout);
- }
- if (!tmpFile.renameTo(savedCachePath))
- throw new IOException("Unable to rename cache to " +
savedCachePath);
- if (logger.isDebugEnabled())
- logger.debug("saved " + count + " keys in " +
(System.currentTimeMillis() - start) + " ms from " + msgSuffix);
- }
- }
-
- public void saveKeyCache() throws IOException
+ public CacheWriter<Pair<Descriptor, DecoratedKey>, Long>
getKeyCacheWriter()
{
Function<Pair<Descriptor, DecoratedKey>, ByteBuffer> function = new
Function<Pair<Descriptor, DecoratedKey>, ByteBuffer>()
{
@@ -108,11 +66,10 @@ public class SSTableTracker implements I
return key.right.key;
}
};
- CacheWriter<Pair<Descriptor, DecoratedKey>, Long> writer = new
CacheWriter<Pair<Descriptor, DecoratedKey>, Long>();
- writer.saveCache(keyCache,
DatabaseDescriptor.getSerializedKeyCachePath(ksname, cfname), function);
+ return new CacheWriter<Pair<Descriptor, DecoratedKey>, Long>(cfname,
keyCache, DatabaseDescriptor.getSerializedKeyCachePath(ksname, cfname),
function);
}
- public void saveRowCache() throws IOException
+ public CacheWriter<DecoratedKey, ColumnFamily> getRowCacheWriter()
{
Function<DecoratedKey, ByteBuffer> function = new
Function<DecoratedKey, ByteBuffer>()
{
@@ -121,8 +78,7 @@ public class SSTableTracker implements I
return key.key;
}
};
- CacheWriter<DecoratedKey, ColumnFamily> writer = new
CacheWriter<DecoratedKey, ColumnFamily>();
- writer.saveCache(rowCache,
DatabaseDescriptor.getSerializedRowCachePath(ksname, cfname), function);
+ return new CacheWriter<DecoratedKey, ColumnFamily>(cfname, rowCache,
DatabaseDescriptor.getSerializedRowCachePath(ksname, cfname), function);
}
public synchronized void replace(Collection<SSTableReader> oldSSTables,
Iterable<SSTableReader> replacements)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
Tue Feb 8 15:51:44 2011
@@ -289,7 +289,7 @@ public class SSTableWriter extends SSTab
}
}
- public long getBytesRead()
+ public long getBytesComplete()
{
return indexer.dfile.getFilePointer();
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Tue
Feb 8 15:51:44 2011
@@ -277,7 +277,8 @@ public final class MessagingService impl
private void addCallback(IMessageCallback cb, String messageId,
InetAddress to)
{
- callbacks.put(messageId, new Pair<InetAddress, IMessageCallback>(to,
cb));
+ Pair<InetAddress, IMessageCallback> previous =
callbacks.put(messageId, new Pair<InetAddress, IMessageCallback>(to, cb));
+ assert previous == null;
}
/**
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue
Feb 8 15:51:44 2011
@@ -220,7 +220,7 @@ public class StorageProxy implements Sto
if (destination.equals(FBUtilities.getLocalAddress()))
{
if (insertLocalMessages)
- insertLocalMessage(rm, responseHandler);
+ insertLocal(rm, responseHandler);
}
else
{
@@ -241,7 +241,8 @@ public class StorageProxy implements Sto
}
else
{
- // hinted
+ // hinted messages are unique, so there is no point to adding
a hop by forwarding via another node.
+ // thus, we use sendRR/sendOneWay directly here.
Message hintedMessage =
rm.getMessage(Gossiper.instance.getVersion(destination));
for (InetAddress target : targets)
{
@@ -249,24 +250,14 @@ public class StorageProxy implements Sto
{
addHintHeader(hintedMessage, target);
if (logger.isDebugEnabled())
- logger.debug("insert writing key " +
ByteBufferUtil.bytesToHex(rm.key()) + " to " + hintedMessage.getMessageId() +
"@" + destination + " for " + target);
+ logger.debug("insert writing key " +
ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination + " for " + target);
}
}
- // (non-destination hints are part of the callback and count
towards consistency only under CL.ANY)
+ // non-destination hints are part of the callback and count
towards consistency only under CL.ANY
if (targets.contains(destination) || consistency_level ==
ConsistencyLevel.ANY)
MessagingService.instance().sendRR(hintedMessage,
destination, responseHandler);
else
MessagingService.instance().sendOneWay(hintedMessage,
destination);
-
- Multimap<Message, InetAddress> messages = dcMessages.get(dc);
-
- if (messages == null)
- {
- messages = HashMultimap.create();
- dcMessages.put(dc, messages);
- }
-
- messages.put(hintedMessage, destination);
}
sendMessages(localDataCenter, dcMessages, responseHandler);
@@ -335,7 +326,7 @@ public class StorageProxy implements Sto
message.setHeader(RowMutation.HINT, bos.toByteArray());
}
- private static void insertLocalMessage(final RowMutation rm, final
IWriteResponseHandler responseHandler)
+ private static void insertLocal(final RowMutation rm, final
IWriteResponseHandler responseHandler)
{
if (logger.isDebugEnabled())
logger.debug("insert writing local " + rm.toString(true));
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1068454&r1=1068453&r2=1068454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java Tue
Feb 8 15:51:44 2011
@@ -107,9 +107,10 @@ public class ExpiringMap<K, V>
timer.cancel();
}
- public void put(K key, V value)
+ public V put(K key, V value)
{
- cache.put(key, new CacheableObject<V>(value));
+ CacheableObject<V> previous = cache.put(key, new
CacheableObject<V>(value));
+ return (previous == null) ? null : previous.getValue();
}
public V get(K key)