Author: jbellis
Date: Fri Aug 26 21:31:35 2011
New Revision: 1162266
URL: http://svn.apache.org/viewvc?rev=1162266&view=rev
Log:
wip
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java?rev=1162266&r1=1162265&r2=1162266&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java
Fri Aug 26 21:31:35 2011
@@ -23,16 +23,15 @@ import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.Iterables;
-import org.apache.cassandra.io.sstable.SSTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.db.columniterator.SimpleAbstractColumnIterator;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.CloseableIterator;
@@ -42,26 +41,24 @@ public class CollationController
{
private static Logger logger =
LoggerFactory.getLogger(CollationController.class);
- private final DataTracker.View dataview;
+ private final ColumnFamilyStore cfs;
private final ISortedColumns.Factory factory;
private final QueryFilter filter;
private final int gcBefore;
- private final CFMetaData metadata;
private int sstablesIterated = 0;
- public CollationController(DataTracker.View dataview,
ISortedColumns.Factory factory, QueryFilter filter, CFMetaData metadata, int
gcBefore)
+ public CollationController(ColumnFamilyStore cfs, ISortedColumns.Factory
factory, QueryFilter filter, int gcBefore)
{
- this.dataview = dataview;
+ this.cfs = cfs;
this.factory = factory;
this.filter = filter;
this.gcBefore = gcBefore;
- this.metadata = metadata;
}
public ColumnFamily getTopLevelColumns()
{
- return filter.filter instanceof NamesQueryFilter &&
metadata.getDefaultValidator() != CounterColumnType.instance
+ return filter.filter instanceof NamesQueryFilter &&
cfs.metadata.getDefaultValidator() != CounterColumnType.instance
? collectTimeOrderedData()
: collectAllData();
}
@@ -74,14 +71,18 @@ public class CollationController
private ColumnFamily collectTimeOrderedData()
{
logger.debug("collectTimeOrderedData");
- List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
- final ColumnFamily container = ColumnFamily.create(metadata, factory,
filter.filter.isReversed());
- List<SSTableReader> sstables = null;
- try
- {
+
+ List<IColumnIterator> iterators;
+ ColumnFamily container;
+ while (true)
+ {
+ DataTracker.View dataview = cfs.getDataTracker().getView();
+ iterators = new ArrayList<IColumnIterator>();
+ container = ColumnFamily.create(cfs.metadata, factory,
filter.filter.isReversed());
+ List<SSTableReader> sstables;
for (Memtable memtable :
Iterables.concat(dataview.memtablesPendingFlush,
Collections.singleton(dataview.memtable)))
{
- IColumnIterator iter =
filter.getMemtableColumnIterator(memtable, metadata.comparator);
+ IColumnIterator iter =
filter.getMemtableColumnIterator(memtable, cfs.metadata.comparator);
if (iter != null)
{
iterators.add(iter);
@@ -93,49 +94,58 @@ public class CollationController
// avoid changing the filter columns of the original filter
// (reduceNameFilter removes columns that are known to be
irrelevant)
- TreeSet<ByteBuffer> filterColumns = new
TreeSet<ByteBuffer>(metadata.comparator);
+ TreeSet<ByteBuffer> filterColumns = new
TreeSet<ByteBuffer>(cfs.metadata.comparator);
filterColumns.addAll(((NamesQueryFilter) filter.filter).columns);
QueryFilter reducedFilter = new QueryFilter(filter.key,
filter.path, new NamesQueryFilter(filterColumns));
/* add the SSTables on disk */
sstables = dataview.intervalTree.search(new Interval(filter.key,
filter.key));
Collections.sort(sstables, SSTable.maxTimestampComparator);
- SSTableReader.acquireReferences(sstables);
- // read sorted sstables
- for (SSTableReader sstable : sstables)
- {
- long currentMaxTs = sstable.getMaxTimestamp();
- reduceNameFilter(reducedFilter, container, currentMaxTs);
- if (((NamesQueryFilter)
reducedFilter.filter).columns.isEmpty())
- break;
-
- IColumnIterator iter =
reducedFilter.getSSTableColumnIterator(sstable);
- iterators.add(iter);
- if (iter.getColumnFamily() != null)
+ if (!SSTableReader.acquireReferences(sstables))
+ continue; // retry w/ new view
+
+ try
+ {
+ // read sorted sstables
+ for (SSTableReader sstable : sstables)
{
- container.delete(iter.getColumnFamily());
- sstablesIterated++;
- while (iter.hasNext())
- container.addColumn(iter.next());
+ long currentMaxTs = sstable.getMaxTimestamp();
+ reduceNameFilter(reducedFilter, container, currentMaxTs);
+ if (((NamesQueryFilter)
reducedFilter.filter).columns.isEmpty())
+ break;
+
+ IColumnIterator iter =
reducedFilter.getSSTableColumnIterator(sstable);
+ iterators.add(iter);
+ if (iter.getColumnFamily() != null)
+ {
+ container.delete(iter.getColumnFamily());
+ sstablesIterated++;
+ while (iter.hasNext())
+ container.addColumn(iter.next());
+ }
}
}
- }
- finally
- {
- SSTableReader.releaseReferences(sstables);
- for (IColumnIterator iter : iterators)
- FileUtils.closeQuietly(iter);
+ finally
+ {
+ SSTableReader.releaseReferences(sstables);
+ for (IColumnIterator iter : iterators)
+ FileUtils.closeQuietly(iter);
+ }
+
+ break; // sstable reference acquisition was successful
}
+
// we need to distinguish between "there is no data at all for this
row" (BF will let us rebuild that efficiently)
// and "there used to be data, but it's gone now" (we should cache the
empty CF so we don't need to rebuild that slower)
if (iterators.isEmpty())
return null;
// do a final collate. toCollate is boilerplate required to provide a
CloseableIterator
+ final ColumnFamily c2 = container;
CloseableIterator<IColumn> toCollate = new
SimpleAbstractColumnIterator()
{
- final Iterator<IColumn> iter = container.iterator();
+ final Iterator<IColumn> iter = c2.iterator();
protected IColumn computeNext()
{
@@ -144,7 +154,7 @@ public class CollationController
public ColumnFamily getColumnFamily()
{
- return container;
+ return c2;
}
public DecoratedKey getKey()
@@ -153,7 +163,7 @@ public class CollationController
}
};
ColumnFamily returnCF = container.cloneMeShallow();
- filter.collateColumns(returnCF, Collections.singletonList(toCollate),
metadata.comparator, gcBefore);
+ filter.collateColumns(returnCF, Collections.singletonList(toCollate),
cfs.metadata.comparator, gcBefore);
// Caller is responsible for final removeDeletedCF. This is important
for cacheRow to work correctly:
return returnCF;
@@ -187,14 +197,15 @@ public class CollationController
{
logger.debug("collectAllData");
List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
- ColumnFamily returnCF = ColumnFamily.create(metadata, factory,
filter.filter.isReversed());
- List<SSTableReader> sstables = null;
+ ColumnFamily returnCF = ColumnFamily.create(cfs.metadata, factory,
filter.filter.isReversed());
+ List<SSTableReader> sstables;
- try
+ while (true)
{
+ DataTracker.View dataview = cfs.getDataTracker().getView();
for (Memtable memtable :
Iterables.concat(dataview.memtablesPendingFlush,
Collections.singleton(dataview.memtable)))
{
- IColumnIterator iter =
filter.getMemtableColumnIterator(memtable, metadata.comparator);
+ IColumnIterator iter =
filter.getMemtableColumnIterator(memtable, cfs.metadata.comparator);
if (iter != null)
{
returnCF.delete(iter.getColumnFamily());
@@ -204,23 +215,30 @@ public class CollationController
/* add the SSTables on disk */
sstables = dataview.intervalTree.search(new Interval(filter.key,
filter.key));
- SSTableReader.acquireReferences(sstables);
- for (SSTableReader sstable : sstables)
+ if (!SSTableReader.acquireReferences(sstables))
+ continue; // retry w/ new view
+
+ try
{
- IColumnIterator iter =
filter.getSSTableColumnIterator(sstable);
- iterators.add(iter);
- if (iter.getColumnFamily() != null)
+ for (SSTableReader sstable : sstables)
{
- returnCF.delete(iter.getColumnFamily());
- sstablesIterated++;
+ IColumnIterator iter =
filter.getSSTableColumnIterator(sstable);
+ iterators.add(iter);
+ if (iter.getColumnFamily() != null)
+ {
+ returnCF.delete(iter.getColumnFamily());
+ sstablesIterated++;
+ }
}
}
- }
- finally
- {
- SSTableReader.releaseReferences(sstables);
- for (IColumnIterator iter : iterators)
- FileUtils.closeQuietly(iter);
+ finally
+ {
+ SSTableReader.releaseReferences(sstables);
+ for (IColumnIterator iter : iterators)
+ FileUtils.closeQuietly(iter);
+ }
+
+ break; // sstable reference acquisition was successful
}
// we need to distinguish between "there is no data at all for this
row" (BF will let us rebuild that efficiently)
@@ -228,7 +246,7 @@ public class CollationController
if (iterators.isEmpty())
return null;
- filter.collateColumns(returnCF, iterators, metadata.comparator,
gcBefore);
+ filter.collateColumns(returnCF, iterators, cfs.metadata.comparator,
gcBefore);
// Caller is responsible for final removeDeletedCF. This is important
for cacheRow to work correctly:
return returnCF;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1162266&r1=1162265&r2=1162266&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri
Aug 26 21:31:35 2011
@@ -888,21 +888,14 @@ public class ColumnFamilyStore implement
*/
public boolean isKeyInRemainingSSTables(DecoratedKey key, Set<? extends
SSTable> sstablesToIgnore)
{
- DataTracker.View currentView = markCurrentViewReferenced();
- try
+ // we don't need to acquire references here, since the bloom filter is
safe to use even post-compaction
+ List<SSTableReader> filteredSSTables =
data.getView().intervalTree.search(new Interval(key, key));
+ for (SSTableReader sstable : filteredSSTables)
{
- List<SSTableReader> filteredSSTables =
currentView.intervalTree.search(new Interval(key, key));
- for (SSTableReader sstable : filteredSSTables)
- {
- if (!sstablesToIgnore.contains(sstable) &&
sstable.getBloomFilter().isPresent(key.key))
- return true;
- }
- return false;
- }
- finally
- {
- SSTableReader.releaseReferences(currentView.sstables);
+ if (!sstablesToIgnore.contains(sstable) &&
sstable.getBloomFilter().isPresent(key.key))
+ return true;
}
+ return false;
}
/*
@@ -1261,16 +1254,8 @@ public class ColumnFamilyStore implement
while (true)
{
DataTracker.View currentView = data.getView();
- SSTableReader.acquireReferences(currentView.sstables);
- if (currentView.sstables == data.getView().sstables) // reference
equality is fine
- {
+ if (SSTableReader.acquireReferences(currentView.sstables))
return currentView;
- }
- else
- {
- // the set of sstables has changed, let's release the acquired
references and try again
- SSTableReader.releaseReferences(currentView.sstables);
- }
}
}
@@ -1287,20 +1272,12 @@ public class ColumnFamilyStore implement
private ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore,
ISortedColumns.Factory factory)
{
- DataTracker.View currentView = markCurrentViewReferenced();
- try
- {
- CollationController controller = new CollationController(currentView,
factory, filter, metadata, gcBefore);
+ CollationController controller = new CollationController(this,
factory, filter, gcBefore);
ColumnFamily columns = controller.getTopLevelColumns();
recentSSTablesPerRead.add(controller.getSstablesIterated());
sstablesPerRead.add(controller.getSstablesIterated());
return columns;
}
- finally
- {
- SSTableReader.releaseReferences(currentView.sstables);
- }
- }
/**
* Fetch a range of rows and columns from memtables/sstables.
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1162266&r1=1162265&r2=1162266&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Fri Aug
26 21:31:35 2011
@@ -298,16 +298,8 @@ public class DataTracker
if (logger.isDebugEnabled())
logger.debug(String.format("removing %s from list of files
tracked for %s.%s",
sstable.descriptor, cfstore.table.name,
cfstore.getColumnFamilyName()));
- // A reference must be acquire before any call to markCompacted,
see SSTableReader for details
- sstable.acquireReference();
- try
- {
- sstable.markCompacted();
- }
- finally
- {
- sstable.releaseReference();
- }
+ sstable.markCompacted();
+ sstable.releaseReference();
liveSize.addAndGet(-sstable.bytesOnDisk());
}
}
@@ -511,9 +503,9 @@ public class DataTracker
// Obviously, dropping sstables whose max column timestamp happens to
be equal to another's
// is not acceptable for us. So, we use a List instead.
public final List<SSTableReader> sstables;
- public final IntervalTree intervalTree;
+ public final IntervalTree<SSTableReader> intervalTree;
- View(Memtable memtable, Set<Memtable> pendingFlush,
List<SSTableReader> sstables, Set<SSTableReader> compacting, IntervalTree
intervalTree)
+ View(Memtable memtable, Set<Memtable> pendingFlush,
List<SSTableReader> sstables, Set<SSTableReader> compacting,
IntervalTree<SSTableReader> intervalTree)
{
this.memtable = memtable;
this.memtablesPendingFlush = pendingFlush;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1162266&r1=1162265&r2=1162266&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Fri Aug 26 21:31:35 2011
@@ -798,6 +798,8 @@ public class CompactionManager implement
throw new AssertionError(e);
}
+ // we don't mark validating sstables as compacting in DataTracker, so
we have to mark them referenced
+ // instead so they won't be cleaned up if they do get compacted during
the validation
Collection<SSTableReader> sstables =
cfs.markCurrentSSTablesReferenced();
CompactionIterable ci = new ValidationCompactionIterable(cfs,
sstables, validator.request.range);
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1162266&r1=1162265&r2=1162266&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
Fri Aug 26 21:31:35 2011
@@ -81,7 +81,9 @@ public class SSTableReader extends SSTab
private BloomFilterTracker bloomFilterTracker = new BloomFilterTracker();
- private final AtomicInteger references = new AtomicInteger(0);
+ private final AtomicInteger references = new AtomicInteger(1);
+ // technically isCompacted is not necessary since it should never be
unreferenced unless it is also compacted,
+ // but it seems like a good extra layer of protection against reference
counting bugs to not delete data based on that alone
private final AtomicBoolean isCompacted = new AtomicBoolean(false);
private final SSTableDeletingTask deletingTask;
@@ -618,9 +620,16 @@ public class SSTableReader extends SSTab
return dfile.length;
}
- public void acquireReference()
+ public boolean acquireReference()
{
- references.incrementAndGet();
+ while (true)
+ {
+ int n = references.get();
+ if (n <= 0)
+ return false;
+ if (references.compareAndSet(n, n + 1))
+ return true;
+ }
}
public void releaseReference()
@@ -831,13 +840,32 @@ public class SSTableReader extends SSTab
: RandomAccessReader.open(new File(getFilename()), bufferSize,
skipIOCache);
}
- public static void acquireReferences(Iterable<SSTableReader> sstables)
+ /**
+ * @param sstables
+ * @return true if all desired references were acquired. Otherwise, it
will unreference any partial acquisition, and return false.
+ */
+ public static boolean acquireReferences(Iterable<SSTableReader> sstables)
{
+ SSTableReader failed = null;
+ for (SSTableReader sstable : sstables)
+ {
+ if (!sstable.acquireReference())
+ {
+ failed = sstable;
+ break;
+ }
+ }
+
+ if (failed == null)
+ return true;
+
for (SSTableReader sstable : sstables)
{
- if (sstable != null)
- sstable.acquireReference();
+ if (sstable == failed)
+ break;
+ sstable.releaseReference();
}
+ return false;
}
public static void releaseReferences(Iterable<SSTableReader> sstables)
@@ -846,10 +874,9 @@ public class SSTableReader extends SSTab
{
try
{
- if (sstable != null)
- sstable.releaseReference();
+ sstable.releaseReference();
}
- catch (Throwable ex)
+ catch (Exception ex)
{
logger.error("Failed releasing reference on " + sstable, ex);
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1162266&r1=1162265&r2=1162266&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
Fri Aug 26 21:31:35 2011
@@ -125,7 +125,6 @@ public class StreamInSession
{
if (files.isEmpty())
{
- // wait for bloom filters and row indexes to finish building
HashMap <ColumnFamilyStore, List<SSTableReader>> cfstores = new
HashMap<ColumnFamilyStore, List<SSTableReader>>();
List<SSTableReader> referenced = new LinkedList<SSTableReader>();
try
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java?rev=1162266&r1=1162265&r2=1162266&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
Fri Aug 26 21:31:35 2011
@@ -234,7 +234,6 @@ public class SSTableUtils
long start = System.currentTimeMillis();
while (appender.append(writer)) { /* pass */ }
SSTableReader reader = writer.closeAndOpenReader();
- reader.acquireReference();
// mark all components for removal
if (cleanup)
for (Component component : reader.components)
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1162266&r1=1162265&r2=1162266&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
Fri Aug 26 21:31:35 2011
@@ -296,7 +296,8 @@ public class StreamingTransferTest exten
ranges.add(new Range(secondtolast.getKey().token,
p.getMinimumToken()));
// Acquiring references, transferSSTables needs it
- SSTableReader.acquireReferences(ssTableReaders);
+ if (!SSTableReader.acquireReferences(ssTableReaders))
+ throw new AssertionError();
StreamOutSession session = StreamOutSession.create(keyspace, LOCAL,
null);
StreamOut.transferSSTables(session, ssTableReaders, ranges,
OperationType.BOOTSTRAP);