Author: jbellis
Date: Fri Aug 26 19:39:04 2011
New Revision: 1162220
URL: http://svn.apache.org/viewvc?rev=1162220&view=rev
Log:
Revert "add LeveledCompactionStrategy"
Removed:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
cassandra/trunk/src/java/org/apache/cassandra/notifications/INotification.java
cassandra/trunk/src/java/org/apache/cassandra/notifications/INotificationConsumer.java
cassandra/trunk/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
cassandra/trunk/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java
cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/Interval.java
cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/IntervalNode.java
cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/IntervalTree.java
cassandra/trunk/test/unit/org/apache/cassandra/utils/IntervalTest.java
cassandra/trunk/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1162220&r1=1162219&r2=1162220&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Aug 26 19:39:04 2011
@@ -44,8 +44,6 @@
Thrift<->Avro conversion methods (CASSANDRA-3032)
* Add timeouts to client request schedulers (CASSANDRA-3079)
* Cli to use hashes rather than array of hashes for strategy options
(CASSANDRA-3081)
- * LeveledCompactionStrategy (CASSANDRA-1608)
-
0.8.5
* fix NPE when encryption_options is unspecified (CASSANDRA-3007)
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1162220&r1=1162219&r2=1162220&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Fri
Aug 26 19:39:04 2011
@@ -67,7 +67,7 @@ public final class CFMetaData
public final static double DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS =
sizeMemtableOperations(DEFAULT_MEMTABLE_THROUGHPUT_IN_MB);
public final static double DEFAULT_MERGE_SHARDS_CHANCE = 0.1;
public final static String DEFAULT_ROW_CACHE_PROVIDER =
"org.apache.cassandra.cache.ConcurrentLinkedHashCacheProvider";
- public final static String DEFAULT_COMPACTION_STRATEGY_CLASS =
"SizeTieredCompactionStrategy";
+ public final static String DEFAULT_COMPACTION_STRATEGY_CLASS =
"org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy";
public final static ByteBuffer DEFAULT_KEY_NAME =
ByteBufferUtil.bytes("KEY");
public final static boolean DEFAULT_COMPRESSION = false;
@@ -214,11 +214,11 @@ public final class CFMetaData
try
{
- compactionStrategyClass =
createCompactionSrategy(DEFAULT_COMPACTION_STRATEGY_CLASS);
+ compactionStrategyClass = (Class<? extends
AbstractCompactionStrategy>)Class.forName(DEFAULT_COMPACTION_STRATEGY_CLASS);
}
- catch (ConfigurationException e)
+ catch (Exception e)
{
- throw new AssertionError(e);
+ throw new RuntimeException("Could not create Compaction Strategy
of type " + DEFAULT_COMPACTION_STRATEGY_CLASS, e);
}
compactionStrategyOptions = new HashMap<String, String>();
}
@@ -409,11 +409,11 @@ public final class CFMetaData
{
try
{
- newCFMD.compactionStrategyClass =
createCompactionSrategy(cf.compaction_strategy.toString());
+ newCFMD.compactionStrategyClass((Class<? extends
AbstractCompactionStrategy>)Class.forName(cf.compaction_strategy.toString()));
}
- catch (ConfigurationException e)
+ catch (Exception e)
{
- throw new RuntimeException(e);
+ throw new RuntimeException("Could not create Compaction
Strategy of type " + cf.compaction_strategy.toString(), e);
}
}
if (cf.compaction_strategy_options != null)
@@ -695,7 +695,16 @@ public final class CFMetaData
if (cf_def.isSetKey_alias()) { newCFMD.keyAlias(cf_def.key_alias); }
if (cf_def.isSetKey_validation_class()) {
newCFMD.keyValidator(TypeParser.parse(cf_def.key_validation_class)); }
if (cf_def.isSetCompaction_strategy())
- newCFMD.compactionStrategyClass =
createCompactionSrategy(cf_def.compaction_strategy);
+ {
+ try
+ {
+ newCFMD.compactionStrategyClass((Class<? extends
AbstractCompactionStrategy>)Class.forName(cf_def.compaction_strategy));
+ }
+ catch (Exception e)
+ {
+ throw new ConfigurationException("Unable to set Compaction
Strategy Class of " + cf_def.compaction_strategy, e);
+ }
+ }
if (cf_def.isSetCompaction_strategy_options())
newCFMD.compactionStrategyOptions(new HashMap<String,
String>(cf_def.compaction_strategy_options));
@@ -803,7 +812,16 @@ public final class CFMetaData
}
if (cf_def.compaction_strategy != null)
- compactionStrategyClass =
createCompactionSrategy(cf_def.compaction_strategy.toString());
+ {
+ try
+ {
+ compactionStrategyClass = (Class<? extends
AbstractCompactionStrategy>)Class.forName(cf_def.compaction_strategy.toString());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Could not create Compaction
Strategy of type " + cf_def.compaction_strategy.toString(), e);
+ }
+ }
if (null != cf_def.compaction_strategy_options)
{
@@ -814,20 +832,7 @@ public final class CFMetaData
logger.debug("application result is {}", this);
}
-
- private static Class<? extends AbstractCompactionStrategy>
createCompactionSrategy(String className) throws ConfigurationException
- {
- className = className.contains(".") ? className :
"org.apache.cassandra.db.compaction." + className;
- try
- {
- return (Class<? extends AbstractCompactionStrategy>)
Class.forName(className);
- }
- catch (Exception e)
- {
- throw new ConfigurationException("Could not create Compaction
Strategy of type " + className, e);
- }
- }
-
+
public AbstractCompactionStrategy
createCompactionStrategyInstance(ColumnFamilyStore cfs)
{
try
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java?rev=1162220&r1=1162219&r2=1162220&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java
Fri Aug 26 19:39:04 2011
@@ -19,14 +19,6 @@
*/
package org.apache.cassandra.db;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.collect.Iterables;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.db.columniterator.SimpleAbstractColumnIterator;
@@ -36,7 +28,14 @@ import org.apache.cassandra.db.marshal.C
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.CloseableIterator;
-import org.apache.cassandra.utils.IntervalTree.Interval;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Iterables;
public class CollationController
{
@@ -76,7 +75,7 @@ public class CollationController
logger.debug("collectTimeOrderedData");
List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
final ColumnFamily container = ColumnFamily.create(metadata, factory,
filter.filter.isReversed());
- List<SSTableReader> sstables = null;
+
try
{
for (Memtable memtable :
Iterables.concat(dataview.memtablesPendingFlush,
Collections.singleton(dataview.memtable)))
@@ -97,12 +96,8 @@ public class CollationController
filterColumns.addAll(((NamesQueryFilter) filter.filter).columns);
QueryFilter reducedFilter = new QueryFilter(filter.key,
filter.path, new NamesQueryFilter(filterColumns));
- /* add the SSTables on disk */
- sstables = dataview.intervalTree.search(new Interval(filter.key,
filter.key));
- Collections.sort(sstables, SSTable.maxTimestampComparator);
- SSTableReader.acquireReferences(sstables);
// read sorted sstables
- for (SSTableReader sstable : sstables)
+ for (SSTableReader sstable : dataview.sstables)
{
long currentMaxTs = sstable.getMaxTimestamp();
reduceNameFilter(reducedFilter, container, currentMaxTs);
@@ -122,7 +117,6 @@ public class CollationController
}
finally
{
- SSTableReader.releaseReferences(sstables);
for (IColumnIterator iter : iterators)
FileUtils.closeQuietly(iter);
}
@@ -188,7 +182,6 @@ public class CollationController
logger.debug("collectAllData");
List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
ColumnFamily returnCF = ColumnFamily.create(metadata, factory,
filter.filter.isReversed());
- List<SSTableReader> sstables = null;
try
{
@@ -203,9 +196,7 @@ public class CollationController
}
/* add the SSTables on disk */
- sstables = dataview.intervalTree.search(new Interval(filter.key,
filter.key));
- SSTableReader.acquireReferences(sstables);
- for (SSTableReader sstable : sstables)
+ for (SSTableReader sstable : dataview.sstables)
{
IColumnIterator iter =
filter.getSSTableColumnIterator(sstable);
iterators.add(iter);
@@ -218,7 +209,6 @@ public class CollationController
}
finally
{
- SSTableReader.releaseReferences(sstables);
for (IColumnIterator iter : iterators)
FileUtils.closeQuietly(iter);
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1162220&r1=1162219&r2=1162220&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri
Aug 26 19:39:04 2011
@@ -35,7 +35,11 @@ import com.google.common.collect.Iterabl
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.cache.*;
+import org.apache.cassandra.cache.AutoSavingCache;
+import org.apache.cassandra.cache.AutoSavingKeyCache;
+import org.apache.cassandra.cache.AutoSavingRowCache;
+import org.apache.cassandra.cache.ConcurrentLinkedHashCache;
+import org.apache.cassandra.cache.ICache;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.StageManager;
@@ -45,20 +49,17 @@ import org.apache.cassandra.db.commitlog
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
-import org.apache.cassandra.db.filter.IFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.LocalByPartionerType;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.IndexClause;
import org.apache.cassandra.utils.*;
-import org.apache.cassandra.utils.IntervalTree.Interval;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
@@ -136,7 +137,7 @@ public class ColumnFamilyStore implement
private volatile DefaultDouble memops;
private volatile DefaultInteger rowCacheSaveInSeconds;
private volatile DefaultInteger keyCacheSaveInSeconds;
- private volatile DefaultInteger rowCacheKeysToSave;
+ private volatile DefaultInteger rowCacheKeysToSave;
/** Lock to allow migrations to block all flushing, so we can be sure not
to write orphaned data files */
public final Lock flushLock = new ReentrantLock();
@@ -172,7 +173,7 @@ public class ColumnFamilyStore implement
public void reload()
{
// metadata object has been mutated directly. make all the members
jibe with new settings.
-
+
// only update these runtime-modifiable settings if they have not been
modified.
if (!minCompactionThreshold.isModified())
for (ColumnFamilyStore cfs : concatWithIndexes())
@@ -193,12 +194,11 @@ public class ColumnFamilyStore implement
if (!rowCacheKeysToSave.isModified())
rowCacheKeysToSave = new
DefaultInteger(metadata.getRowCacheKeysToSave());
- compactionStrategy.shutdown();
compactionStrategy = metadata.createCompactionStrategyInstance(this);
updateCacheSizes();
scheduleCacheSaving(rowCacheSaveInSeconds.value(),
keyCacheSaveInSeconds.value(), rowCacheKeysToSave.value());
-
+
indexManager.reload();
}
@@ -206,10 +206,11 @@ public class ColumnFamilyStore implement
{
assert metadata != null : "null metadata for " + table + ":" +
columnFamilyName;
this.table = table;
- columnFamily = columnFamilyName;
+ columnFamily = columnFamilyName;
this.metadata = metadata;
this.minCompactionThreshold = new
DefaultInteger(metadata.getMinCompactionThreshold());
this.maxCompactionThreshold = new
DefaultInteger(metadata.getMaxCompactionThreshold());
+ this.compactionStrategy =
metadata.createCompactionStrategyInstance(this);
this.memsize = new
DefaultInteger(metadata.getMemtableThroughputInMb());
this.memops = new
DefaultDouble(metadata.getMemtableOperationsInMillions());
this.rowCacheSaveInSeconds = new
DefaultInteger(metadata.getRowCacheSavePeriodInSeconds());
@@ -240,9 +241,6 @@ public class ColumnFamilyStore implement
}
data.addSSTables(sstables);
- // compaction strategy should be created after the CFS has been
prepared
- this.compactionStrategy =
metadata.createCompactionStrategyInstance(this);
-
// create the private ColumnFamilyStores for the secondary column
indexes
for (ColumnDefinition info : metadata.getColumn_metadata().values())
{
@@ -339,7 +337,7 @@ public class ColumnFamilyStore implement
return new ColumnFamilyStore(table, columnFamily, partitioner, value,
metadata);
}
-
+
/**
* Removes unnecessary files from the cf directory at startup: these
include temp files, orphans, zero-length files
* and compacted sstables. Files that cannot be recognized will be ignored.
@@ -397,7 +395,7 @@ public class ColumnFamilyStore implement
if (!file.delete())
logger.warn("could not delete " +
file.getAbsolutePath());
}
-
+
// also clean out any index leftovers.
CFMetaData cfm = Schema.instance.getCFMetaData(table, columnFamily);
if (cfm != null) // secondary indexes aren't stored in DD.
@@ -888,21 +886,12 @@ public class ColumnFamilyStore implement
*/
public boolean isKeyInRemainingSSTables(DecoratedKey key, Set<? extends
SSTable> sstablesToIgnore)
{
- DataTracker.View currentView = markCurrentViewReferenced();
- try
+ for (SSTableReader sstable : data.getSSTables())
{
- List<SSTableReader> filteredSSTables =
currentView.intervalTree.search(new Interval(key, key));
- for (SSTableReader sstable : filteredSSTables)
- {
- if (!sstablesToIgnore.contains(sstable) &&
sstable.getBloomFilter().isPresent(key.key))
- return true;
- }
- return false;
- }
- finally
- {
- SSTableReader.releaseReferences(currentView.sstables);
+ if (!sstablesToIgnore.contains(sstable) &&
sstable.getBloomFilter().isPresent(key.key))
+ return true;
}
+ return false;
}
/*
@@ -994,7 +983,7 @@ public class ColumnFamilyStore implement
public void removeAllSSTables()
{
data.removeAllSSTables();
- indexManager.removeAllIndexes();
+ indexManager.removeAllIndexes();
}
public long getMemtableColumnsCount()
@@ -1188,7 +1177,7 @@ public class ColumnFamilyStore implement
ColumnFamily cached = cacheRow(filter.key);
if (cached == null)
return null;
-
+
return filterColumnFamily(cached, filter, gcBefore);
}
finally
@@ -1230,7 +1219,7 @@ public class ColumnFamilyStore implement
// top-level columns
if (sliceFilter.count >= cached.getColumnCount())
{
- removeDeletedColumnsOnly(cached, gcBefore);
+ removeDeletedColumnsOnly(cached, gcBefore);
return removeDeletedCF(cached, gcBefore);
}
}
@@ -1290,12 +1279,12 @@ public class ColumnFamilyStore implement
DataTracker.View currentView = markCurrentViewReferenced();
try
{
- CollationController controller = new CollationController(currentView,
factory, filter, metadata, gcBefore);
- ColumnFamily columns = controller.getTopLevelColumns();
- recentSSTablesPerRead.add(controller.getSstablesIterated());
- sstablesPerRead.add(controller.getSstablesIterated());
- return columns;
- }
+ CollationController controller = new
CollationController(currentView, factory, filter, metadata, gcBefore);
+ ColumnFamily columns = controller.getTopLevelColumns();
+ recentSSTablesPerRead.add(controller.getSstablesIterated());
+ sstablesPerRead.add(controller.getSstablesIterated());
+ return columns;
+ }
finally
{
SSTableReader.releaseReferences(currentView.sstables);
@@ -1304,7 +1293,7 @@ public class ColumnFamilyStore implement
/**
* Fetch a range of rows and columns from memtables/sstables.
- *
+ *
* @param superColumn optional SuperColumn to slice subcolumns of; null
to slice top-level columns
* @param range Either a Bounds, which includes start key, or a Range,
which does not.
* @param maxResults Maximum rows to return
@@ -1333,14 +1322,6 @@ public class ColumnFamilyStore implement
// It is fine to aliases the View.sstables since it's an
unmodifiable collection
Collection<SSTableReader> sstables = currentView.sstables;
- Comparable startWithComp = startWith;
- Comparable stopAtComp = stopAt;
- if (startWith.token.equals(partitioner.getMinimumToken()))
- startWithComp = currentView.intervalTree.min;
- if (stopAt.token.equals(partitioner.getMinimumToken()))
- stopAtComp = currentView.intervalTree.max;
- sstables = currentView.intervalTree.search(new
Interval(startWithComp, stopAtComp));
-
CloseableIterator<Row> iterator =
RowIteratorFactory.getIterator(memtables, sstables, startWith, stopAt, filter,
getComparator(), this);
List<Row> rows = new ArrayList<Row>();
@@ -1392,12 +1373,12 @@ public class ColumnFamilyStore implement
SSTableReader.releaseReferences(currentView.sstables);
}
}
-
+
public List<Row> search(IndexClause clause, AbstractBounds range, IFilter
dataFilter)
{
return indexManager.search(clause, range, dataFilter);
}
-
+
public AbstractType getComparator()
{
return metadata.comparator;
@@ -1440,8 +1421,8 @@ public class ColumnFamilyStore implement
/**
* Take a snap shot of this columnfamily store.
- *
- * @param snapshotName the name of the associated with the snapshot
+ *
+ * @param snapshotName the name of the associated with the snapshot
*/
public void snapshot(String snapshotName)
{
@@ -1696,7 +1677,7 @@ public class ColumnFamilyStore implement
return data.getRecentBloomFilterFalseRatio();
}
-
+
@Override
public String toString()
@@ -1734,7 +1715,7 @@ public class ColumnFamilyStore implement
{
return minCompactionThreshold.value();
}
-
+
public void setMinimumCompactionThreshold(int minCompactionThreshold)
{
if ((minCompactionThreshold > this.maxCompactionThreshold.value()) &&
this.maxCompactionThreshold.value() != 0)
@@ -1957,11 +1938,4 @@ public class ColumnFamilyStore implement
return reader;
}
-
- public int getUnleveledSSTables()
- {
- return this.compactionStrategy instanceof LeveledCompactionStrategy
- ? ((LeveledCompactionStrategy)
this.compactionStrategy).getLevelSize(0)
- : 0;
- }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1162220&r1=1162219&r2=1162220&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
Fri Aug 26 19:39:04 2011
@@ -240,9 +240,4 @@ public interface ColumnFamilyStoreMBean
* determine which SSTables should be loaded and load them
*/
public void loadNewSSTables();
-
- /**
- * @return the number of SSTables in L0. Always return 0 if Leveled
compaction is not enabled.
- */
- public int getUnleveledSSTables();
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1162220&r1=1162219&r2=1162220&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Fri Aug
26 19:39:04 2011
@@ -20,38 +20,30 @@
package org.apache.cassandra.db;
import java.io.File;
+import java.io.IOError;
import java.io.IOException;
import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.collect.*;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.AutoSavingCache;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.notifications.INotification;
-import org.apache.cassandra.notifications.INotificationConsumer;
-import org.apache.cassandra.notifications.SSTableAddedNotification;
-import org.apache.cassandra.notifications.SSTableListChangedNotification;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.IntervalTree.Interval;
-import org.apache.cassandra.utils.IntervalTree.IntervalTree;
import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.WrappedRunnable;
public class DataTracker
{
private static final Logger logger =
LoggerFactory.getLogger(DataTracker.class);
- public static Collection<INotificationConsumer> subscribers = new
CopyOnWriteArrayList<INotificationConsumer>();
-
public final ColumnFamilyStore cfstore;
private final AtomicReference<View> view;
@@ -142,27 +134,26 @@ public class DataTracker
addNewSSTablesSize(Arrays.asList(sstable));
cfstore.updateCacheSizes();
- notifyAdded(sstable);
incrementallyBackup(sstable);
}
- public void incrementallyBackup(final SSTableReader sstable)
+ public void incrementallyBackup(SSTableReader sstable)
{
- if (!DatabaseDescriptor.incrementalBackupsEnabled())
- return;
-
- Runnable runnable = new WrappedRunnable()
+ if (DatabaseDescriptor.incrementalBackupsEnabled())
{
- protected void runMayThrow() throws Exception
+ File keyspaceDir = new File(sstable.getFilename()).getParentFile();
+ File backupsDir = new File(keyspaceDir, "backups");
+ try
{
- File keyspaceDir = new
File(sstable.getFilename()).getParentFile();
- File backupsDir = new File(keyspaceDir, "backups");
if (!backupsDir.exists() && !backupsDir.mkdirs())
throw new IOException("Unable to create " + backupsDir);
sstable.createLinks(backupsDir.getCanonicalPath());
}
- };
- StorageService.tasks.execute(runnable);
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
}
/**
@@ -242,7 +233,6 @@ public class DataTracker
{
addSSTables(Arrays.asList(sstable));
incrementallyBackup(sstable);
- notifyAdded(sstable);
}
public void removeAllSSTables()
@@ -256,8 +246,7 @@ public class DataTracker
view.set(new View(new Memtable(cfstore),
Collections.<Memtable>emptySet(),
Collections.<SSTableReader>emptyList(),
- Collections.<SSTableReader>emptySet(),
- new IntervalTree()));
+ Collections.<SSTableReader>emptySet()));
}
private void replace(Collection<SSTableReader> oldSSTables,
Iterable<SSTableReader> replacements)
@@ -273,7 +262,6 @@ public class DataTracker
addNewSSTablesSize(replacements);
removeOldSSTablesSize(oldSSTables);
- notifySSTablesChanged(replacements, oldSSTables);
cfstore.updateCacheSizes();
}
@@ -467,35 +455,6 @@ public class DataTracker
return (double) falseCount / (trueCount + falseCount);
}
- public void notifySSTablesChanged(Iterable<SSTableReader> added,
Iterable<SSTableReader> removed)
- {
- for (INotificationConsumer subscriber : subscribers)
- {
- INotification notification = new
SSTableListChangedNotification(added, removed);
- subscriber.handleNotification(notification, this);
- }
- }
-
- public void notifyAdded(SSTableReader added)
- {
- for (INotificationConsumer subscriber : subscribers)
- {
- INotification notification = new SSTableAddedNotification(added);
- subscriber.handleNotification(notification, this);
- }
- }
-
- public static void subscribe(INotificationConsumer consumer)
- {
- subscribers.add(consumer);
- }
-
- public static void unsubscribe(INotificationConsumer consumer)
- {
- boolean found = subscribers.remove(consumer);
- assert found : consumer + " not subscribed";
- }
-
/**
* An immutable structure holding the current memtable, the memtables
pending
* flush, the sstables for a column family, and the sstables that are
active
@@ -512,63 +471,49 @@ public class DataTracker
// Obviously, dropping sstables whose max column timestamp happens to
be equal to another's
// is not acceptable for us. So, we use a List instead.
public final List<SSTableReader> sstables;
- public final IntervalTree intervalTree;
- View(Memtable memtable, Set<Memtable> pendingFlush,
List<SSTableReader> sstables, Set<SSTableReader> compacting, IntervalTree
intervalTree)
+ View(Memtable memtable, Set<Memtable> pendingFlush,
List<SSTableReader> sstables, Set<SSTableReader> compacting)
{
this.memtable = memtable;
this.memtablesPendingFlush = pendingFlush;
this.sstables = sstables;
this.compacting = compacting;
- this.intervalTree = intervalTree;
- }
-
- private IntervalTree buildIntervalTree(List<SSTableReader> sstables)
- {
- List<SSTableReader> itsstList =
ImmutableList.copyOf(Ordering.from(SSTable.sstableComparator).sortedCopy(sstables));
- List<Interval> intervals = new
ArrayList<Interval>(itsstList.size());
- for (SSTableReader sstable : itsstList)
- intervals.add(new Interval<SSTableReader>(sstable.first,
sstable.last, sstable));
- assert intervals.size() == sstables.size();
- return new IntervalTree<SSTableReader>(intervals);
}
public View switchMemtable(Memtable newMemtable)
{
Set<Memtable> newPending =
ImmutableSet.<Memtable>builder().addAll(memtablesPendingFlush).add(memtable).build();
- return new View(newMemtable, newPending, sstables, compacting,
intervalTree);
+ return new View(newMemtable, newPending, sstables, compacting);
}
public View renewMemtable(Memtable newMemtable)
{
- return new View(newMemtable, memtablesPendingFlush, sstables,
compacting, intervalTree);
+ return new View(newMemtable, memtablesPendingFlush, sstables,
compacting);
}
public View replaceFlushed(Memtable flushedMemtable, SSTableReader
newSSTable)
{
Set<Memtable> newPending =
ImmutableSet.copyOf(Sets.difference(memtablesPendingFlush,
Collections.singleton(flushedMemtable)));
List<SSTableReader> newSSTables = newSSTables(newSSTable);
- IntervalTree intervalTree = buildIntervalTree(newSSTables);
- return new View(memtable, newPending,
Collections.unmodifiableList(newSSTables), compacting, intervalTree);
+ return new View(memtable, newPending,
Collections.unmodifiableList(newSSTables), compacting);
}
public View replace(Collection<SSTableReader> oldSSTables,
Iterable<SSTableReader> replacements)
{
List<SSTableReader> newSSTables = newSSTables(oldSSTables,
replacements);
- IntervalTree intervalTree = buildIntervalTree(newSSTables);
- return new View(memtable, memtablesPendingFlush,
Collections.unmodifiableList(newSSTables), compacting, intervalTree);
+ return new View(memtable, memtablesPendingFlush,
Collections.unmodifiableList(newSSTables), compacting);
}
public View markCompacting(Collection<SSTableReader> tomark)
{
Set<SSTableReader> compactingNew =
ImmutableSet.<SSTableReader>builder().addAll(compacting).addAll(tomark).build();
- return new View(memtable, memtablesPendingFlush, sstables,
compactingNew, intervalTree);
+ return new View(memtable, memtablesPendingFlush, sstables,
compactingNew);
}
public View unmarkCompacting(Collection<SSTableReader> tounmark)
{
Set<SSTableReader> compactingNew =
ImmutableSet.copyOf(Sets.difference(compacting, ImmutableSet.copyOf(tounmark)));
- return new View(memtable, memtablesPendingFlush, sstables,
compactingNew, intervalTree);
+ return new View(memtable, memtablesPendingFlush, sstables,
compactingNew);
}
private List<SSTableReader> newSSTables(SSTableReader newSSTable)
@@ -589,6 +534,7 @@ public class DataTracker
}
Iterables.addAll(newSSTables, replacements);
assert newSSTables.size() == newSSTablesSize;
+ Collections.sort(newSSTables, SSTable.maxTimestampComparator);
return newSSTables;
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java?rev=1162220&r1=1162219&r2=1162220&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java
Fri Aug 26 19:39:04 2011
@@ -89,6 +89,7 @@ public class RowIteratorFactory
iterators.add(new ConvertToColumnIterator(filter, comparator, p,
memtable.getEntryIterator(startWith)));
}
+ // sstables
for (SSTableReader sstable : sstables)
{
final SSTableScanner scanner =
sstable.getScanner(RANGE_FILE_BUFFER_SIZE, filter);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java?rev=1162220&r1=1162219&r2=1162220&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
Fri Aug 26 19:39:04 2011
@@ -50,18 +50,11 @@ public abstract class AbstractCompaction
protected AbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String,
String> options)
{
- assert cfs != null;
this.cfs = cfs;
this.options = options;
}
/**
- * Releases any resources if this strategy is shutdown (when the CFS is
reloaded after a schema change).
- * Default is to do nothing.
- */
- public void shutdown() { }
-
- /**
* @return a list of compaction tasks that should run in the background to
get the sstable
* count down to desired parameters. Will not be null, but may be empty.
* @param gcBefore throw away tombstones older than this
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1162220&r1=1162219&r2=1162220&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Fri Aug 26 19:39:04 2011
@@ -19,8 +19,14 @@
package org.apache.cassandra.db.compaction;
import java.io.IOException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterators;
@@ -43,7 +49,6 @@ public class CompactionTask extends Abst
protected String compactionFileLocation;
protected final int gcBefore;
protected boolean isUserDefined;
- protected static long totalBytesCompacted = 0;
public CompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader>
sstables, final int gcBefore)
{
@@ -53,11 +58,6 @@ public class CompactionTask extends Abst
this.isUserDefined = false;
}
- public static synchronized long addToTotalBytesCompacted(long
bytesCompacted)
- {
- return totalBytesCompacted += bytesCompacted;
- }
-
/**
* For internal use and testing only. The rest of the system should go
through the submit* methods,
* which are properly serialized.
@@ -72,7 +72,7 @@ public class CompactionTask extends Abst
Set<SSTableReader> toCompact = new HashSet<SSTableReader>(sstables);
if (!isUserDefined)
{
- if ( !allowSingletonCompaction() && toCompact.size() < 2)
+ if (toCompact.size() < 2)
{
logger.info("Nothing to compact in " +
cfs.getColumnFamilyName() + "." +
"Use forceUserDefinedCompaction if you wish to
force compaction of single sstables " +
@@ -128,18 +128,13 @@ public class CompactionTask extends Abst
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " +
expectedBloomFilterSize);
+ SSTableWriter writer = null;
+ final SSTableReader ssTable;
CompactionIterable ci = new CompactionIterable(type, toCompact,
controller); // retain a handle so we can call close()
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
Iterator<AbstractCompactedRow> nni = Iterators.filter(iter,
Predicates.notNull());
Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
- // we can't preheat until the tracker has been set. This doesn't
happen until we tell the cfs to
- // replace the old entries. Track entries to preheat here until then.
- Map<SSTableReader, Map<DecoratedKey, Long>> cachedKeyMap = new
HashMap<SSTableReader, Map<DecoratedKey, Long>>();
-
- Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
- Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();
-
if (collector != null)
collector.beginCompaction(ci);
try
@@ -153,14 +148,13 @@ public class CompactionTask extends Abst
return 0;
}
- SSTableWriter writer =
cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation,
toCompact);
- writers.add(writer);
+ writer = cfs.createCompactionWriter(expectedBloomFilterSize,
compactionFileLocation, toCompact);
while (nni.hasNext())
{
AbstractCompactedRow row = nni.next();
if (row.isEmpty())
continue;
-
+
long position = writer.append(row);
totalkeysWritten++;
@@ -175,70 +169,32 @@ public class CompactionTask extends Abst
}
}
}
- if (!nni.hasNext() ||
newSSTableSegmentThresholdReached(writer, position))
- {
- SSTableReader toIndex =
writer.closeAndOpenReader(getMaxDataAge(toCompact));
- cachedKeyMap.put(toIndex, cachedKeys);
- sstables.add(toIndex);
- writer =
cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation,
toCompact);
- writers.add(writer);
- cachedKeys = new HashMap<DecoratedKey, Long>();
- }
}
+ ssTable = writer.closeAndOpenReader(getMaxDataAge(toCompact));
}
finally
{
iter.close();
if (collector != null)
collector.finishCompaction(ci);
- for (SSTableWriter writer : writers)
+ if (writer != null)
writer.cleanupIfNecessary();
}
- cfs.replaceCompactedSSTables(toCompact, sstables);
- // TODO: this doesn't belong here, it should be part of the reader to
load when the tracker is wired up
- for (Entry<SSTableReader, Map<DecoratedKey, Long>>
ssTableReaderMapEntry : cachedKeyMap.entrySet())
- {
- SSTableReader key = ssTableReaderMapEntry.getKey();
- for (Entry<DecoratedKey, Long> entry :
ssTableReaderMapEntry.getValue().entrySet())
- key.cacheKey(entry.getKey(), entry.getValue());
- }
-
+ cfs.replaceCompactedSSTables(toCompact, Arrays.asList(ssTable));
+ for (Entry<DecoratedKey, Long> entry : cachedKeys.entrySet()) // empty
if preheat is off
+ ssTable.cacheKey(entry.getKey(), entry.getValue());
CompactionManager.instance.submitBackground(cfs);
long dTime = System.currentTimeMillis() - startTime;
long startsize = SSTable.getTotalBytes(toCompact);
- long endsize = SSTable.getTotalBytes(sstables);
+ long endsize = ssTable.length();
double ratio = (double)endsize / (double)startsize;
-
- StringBuilder builder = new StringBuilder();
- builder.append("[");
- for (SSTableReader reader : sstables)
- builder.append(reader.getFilename()).append(",");
- builder.append("]");
-
- double mbps = dTime > 0 ?
(double)endsize/(1024*1024)/((double)dTime/1000) : 0;
- logger.info(String.format("Compacted to %s. %,d to %,d (~%d%% of
original) bytes for %,d keys at %fMBPS. Time: %,dms.",
- builder.toString(), startsize, endsize,
(int) (ratio * 100), totalkeysWritten, mbps, dTime));
- logger.info(String.format("CF Total Bytes Compacted: %,d",
CompactionTask.addToTotalBytesCompacted(endsize)));
+ logger.info(String.format("Compacted to %s. %,d to %,d (~%d%% of
original) bytes for %,d keys. Time: %,dms.",
+ ssTable.getFilename(), startsize, endsize, (int) (ratio *
100), totalkeysWritten, dTime));
return toCompact.size();
}
- //extensibility point for other strategies that may want to limit the
upper bounds of the sstable segment size
- protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer,
long position)
- {
- return false;
- }
-
- /**
- * extend this if the overridden compaction strategy requires single files
to be compacted to function properly
- * @return boolean
- */
- protected boolean allowSingletonCompaction()
- {
- return false;
- }
-
public static long getMaxDataAge(Collection<SSTableReader> sstables)
{
long max = 0;
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=1162220&r1=1162219&r2=1162220&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Fri
Aug 26 19:39:04 2011
@@ -24,8 +24,6 @@ import java.io.FileFilter;
import java.io.IOException;
import java.util.*;
-import com.google.common.collect.Ordering;
-import org.apache.cassandra.db.DecoratedKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,9 +74,6 @@ public abstract class SSTable
public final IPartitioner partitioner;
public final boolean compression;
- public DecoratedKey first;
- public DecoratedKey last;
-
protected SSTable(Descriptor descriptor, CFMetaData metadata, IPartitioner
partitioner)
{
this(descriptor, new HashSet<Component>(), metadata, partitioner);
@@ -103,16 +98,6 @@ public abstract class SSTable
this.partitioner = partitioner;
}
- public static final Comparator<SSTableReader> sstableComparator = new
Comparator<SSTableReader>()
- {
- public int compare(SSTableReader o1, SSTableReader o2)
- {
- return o1.first.compareTo(o2.first);
- }
- };
-
- public static final Ordering<SSTableReader> sstableOrdering =
Ordering.from(sstableComparator);
-
/**
* We use a ReferenceQueue to manage deleting files that have been
compacted
* and for which no more SSTable references exist. But this is not
guaranteed
@@ -169,8 +154,7 @@ public abstract class SSTable
}
catch (Exception e)
{
- if (!"snapshots".equals(name) && !"backups".equals(name)
- && !name.contains(".json"))
+ if (!"snapshots".equals(name) && !"backups".equals(name))
logger.warn("Invalid file '{}' in data directory {}.", name,
dir);
return null;
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1162220&r1=1162219&r2=1162220&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
Fri Aug 26 19:39:04 2011
@@ -261,7 +261,6 @@ public class SSTableReader extends SSTab
// we read the positions in a BRAF so we don't have to worry about an
entry spanning a mmap boundary.
RandomAccessReader input = RandomAccessReader.open(new
File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
- DecoratedKey left = null, right = null;
try
{
if (keyCache != null && keyCache.getCapacity() - keyCache.size() <
keysToLoadInCache.size())
@@ -279,19 +278,10 @@ public class SSTableReader extends SSTab
if (indexPosition == indexSize)
break;
- ByteBuffer key = null, skippedKey;
- skippedKey = ByteBufferUtil.readWithShortLength(input);
-
boolean shouldAddEntry = indexSummary.shouldAddEntry();
- if (shouldAddEntry || cacheLoading || recreatebloom)
- {
- key = skippedKey;
- }
-
- if(null == left)
- left = decodeKey(partitioner, descriptor, skippedKey);
- right = decodeKey(partitioner, descriptor, skippedKey);
-
+ ByteBuffer key = (shouldAddEntry || cacheLoading ||
recreatebloom)
+ ? ByteBufferUtil.readWithShortLength(input)
+ : ByteBufferUtil.skipShortLength(input);
long dataPosition = input.readLong();
if (key != null)
{
@@ -314,8 +304,6 @@ public class SSTableReader extends SSTab
{
FileUtils.closeQuietly(input);
}
- this.first = left;
- this.last = right;
// finalize the state of the reader
ifile =
ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1162220&r1=1162219&r2=1162220&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
Fri Aug 26 19:39:04 2011
@@ -136,9 +136,6 @@ public class SSTableWriter extends SSTab
private void afterAppend(DecoratedKey decoratedKey, long dataPosition)
throws IOException
{
lastWrittenKey = decoratedKey;
- this.last = lastWrittenKey;
- if(null == this.first)
- this.first = lastWrittenKey;
if (logger.isTraceEnabled())
logger.trace("wrote " + decoratedKey + " at " + dataPosition);
@@ -251,8 +248,6 @@ public class SSTableWriter extends SSTab
iwriter.bf,
maxDataAge,
sstableMetadata);
- sstable.first = this.first;
- sstable.last = this.last;
iwriter = null;
dbuilder = null;
return sstable;