Updated Branches: refs/heads/cassandra-1.1 5d5207b91 -> 6f31aba0e refs/heads/trunk 0f255da32 -> 8cf1259b3
merge from 1.1 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8cf1259b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8cf1259b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8cf1259b Branch: refs/heads/trunk Commit: 8cf1259b367c5b910ef70996191dc10ef0987657 Parents: 0f255da 6f31aba Author: Jonathan Ellis <[email protected]> Authored: Wed Oct 24 14:45:05 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Wed Oct 24 14:48:22 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/RowIteratorFactory.java | 43 ++++++++-- .../db/columniterator/IColumnIteratorFactory.java | 6 ++ .../db/columniterator/LazyColumnIterator.java | 62 +++++++++++++++ .../cassandra/io/sstable/SSTableScanner.java | 18 +++- 5 files changed, 117 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8cf1259b/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index e6c1d1a,0857259..3ef9530 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,41 -1,5 +1,42 @@@ -1.1.7 +1.2-beta2 + * make TRACE verb droppable (CASSANDRA-4672) + * fix BulkLoader recognition of CQL3 columnfamilies (CASSANDRA-4755) + * Sort commitlog segments for replay by id instead of mtime (CASSANDRA-4793) + * Make hint delivery asynchronous (CASSANDRA-4761) + * Pluggable Thrift transport factories for CLI and cqlsh (CASSANDRA-4609, 4610) + * cassandra-cli: allow Double value type to be inserted to a column (CASSANDRA-4661) + * Add ability to use custom TServerFactory implementations (CASSANDRA-4608) + * optimize batchlog flushing to skip successful batches (CASSANDRA-4667) + * include metadata for system keyspace itself in schema tables (CASSANDRA-4416) + * add check to PropertyFileSnitch to verify presence of location for + local node (CASSANDRA-4728) + * add PBSPredictor consistency modeler (CASSANDRA-4261) + * remove vestiges of Thrift unframed mode (CASSANDRA-4729) + * optimize single-row PK lookups (CASSANDRA-4710) + * adjust blockFor calculation to account for pending ranges due to node + movement (CASSANDRA-833) + * Change CQL version to 3.0.0 and stop accepting 3.0.0-beta1 (CASSANDRA-4649) + * (CQL3) Make prepared statement global instead of per connection + (CASSANDRA-4449) + * Fix scrubbing of CQL3 created tables (CASSANDRA-4685) + * (CQL3) Fix validation when using counter and regular columns in the same + table (CASSANDRA-4706) + * Fix bug starting Cassandra with simple authentication (CASSANDRA-4648) + * Add support for batchlog in CQL3 (CASSANDRA-4545, 4738) + * Add support for multiple column family outputs in CFOF (CASSANDRA-4208) + * Support repairing only the local DC nodes (CASSANDRA-4747) + * Use rpc_address for binary protocol and change default port (CASSANRA-4751) + * Fix use of collections in prepared statements (CASSANDRA-4739) + * Store more information into peers table (CASSANDRA-4351, 4814) + * Configurable bucket size for size tiered compaction (CASSANDRA-4704) + * Run leveled compaction in parallel (CASSANDRA-4310) + * Fix potential NPE during CFS reload (CASSANDRA-4786) + * Composite indexes may miss results (CASSANDRA-4796) + * Move consistency level to the protocol level (CASSANDRA-4734, 4824) + * Fix Subcolumn slice ends not respected (CASSANDRA-4826) + * Fix Assertion error in cql3 select (CASSANDRA-4783) +Merged from 1.1: + * fix get_paged_slice to wrap to next row correctly (CASSANDRA-4816) * fix indexing empty column values (CASSANDRA-4832) * allow JdbcDate to compose null Date objects (CASSANDRA-4830) * fix possible stackoverflow when compacting 1000s of sstables http://git-wip-us.apache.org/repos/asf/cassandra/blob/8cf1259b/src/java/org/apache/cassandra/db/RowIteratorFactory.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/RowIteratorFactory.java index 547e27d,5a53c4a..229a08f --- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java +++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java @@@ -19,9 -19,9 +19,17 @@@ package org.apache.cassandra.db import java.util.*; - import com.google.common.collect.AbstractIterator; ++<<<<<<< HEAD + +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; ++||||||| merged common ancestors ++ ++import org.apache.cassandra.db.columniterator.IColumnIterator; ++======= + import org.apache.cassandra.db.columniterator.IColumnIterator; + import org.apache.cassandra.db.columniterator.IColumnIteratorFactory; + import org.apache.cassandra.db.columniterator.LazyColumnIterator; ++>>>>>>> cassandra-1.1 import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableScanner; @@@ -122,7 -122,7 +130,7 @@@ public class RowIteratorFactor /** * Get a ColumnIterator for a specific key in the memtable. */ - private static class ConvertToColumnIterator extends AbstractIterator<OnDiskAtomIterator> implements CloseableIterator<OnDiskAtomIterator> - private static class ConvertToColumnIterator implements CloseableIterator<IColumnIterator> ++ private static class ConvertToColumnIterator implements CloseableIterator<OnDiskAtomIterator> { private final QueryFilter filter; private final Iterator<Map.Entry<DecoratedKey, ColumnFamily>> iter; @@@ -133,14 -133,33 +141,33 @@@ this.iter = iter; } - public OnDiskAtomIterator computeNext() + public boolean hasNext() { - if (iter.hasNext()) + return iter.hasNext(); + } + + /* + * Note that when doing get_paged_slice, we reset the start of the queryFilter after we've fetched the + * first row. This means that this iterator should not use in any way the filter to fetch a row before + * we call next(). Which prevents us for using guava AbstractIterator. + * This is obviously rather fragile and we should consider refactoring that code, but such refactor will go + * deep into the storage engine code so this will have to do until then. + */ - public IColumnIterator next() ++ public OnDiskAtomIterator next() + { + final Map.Entry<DecoratedKey, ColumnFamily> entry = iter.next(); + return new LazyColumnIterator(entry.getKey(), new IColumnIteratorFactory() { - Map.Entry<DecoratedKey, ColumnFamily> entry = iter.next(); - return filter.getMemtableColumnIterator(entry.getValue(), entry.getKey()); - } - return endOfData(); - public IColumnIterator create() ++ public OnDiskAtomIterator create() + { + return filter.getMemtableColumnIterator(entry.getValue(), entry.getKey()); + } + }); + } + + public void remove() + { + throw new UnsupportedOperationException(); } public void close() http://git-wip-us.apache.org/repos/asf/cassandra/blob/8cf1259b/src/java/org/apache/cassandra/db/columniterator/IColumnIteratorFactory.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/columniterator/IColumnIteratorFactory.java index 0000000,c9ce857..91fcdd8 mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/db/columniterator/IColumnIteratorFactory.java +++ b/src/java/org/apache/cassandra/db/columniterator/IColumnIteratorFactory.java @@@ -1,0 -1,6 +1,6 @@@ + package org.apache.cassandra.db.columniterator; + + public interface IColumnIteratorFactory + { - IColumnIterator create(); ++ OnDiskAtomIterator create(); + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8cf1259b/src/java/org/apache/cassandra/db/columniterator/LazyColumnIterator.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/columniterator/LazyColumnIterator.java index 0000000,486836d..aa2c188 mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/db/columniterator/LazyColumnIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/LazyColumnIterator.java @@@ -1,0 -1,62 +1,62 @@@ + package org.apache.cassandra.db.columniterator; + + import com.google.common.collect.AbstractIterator; + import org.apache.cassandra.db.ColumnFamily; + import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.IColumn; ++import org.apache.cassandra.db.OnDiskAtom; + + import java.io.IOException; + + + /* - * The goal of this encapsulating IColumnIterator is to delay the use of ++ * The goal of this encapsulating OnDiskAtomIterator is to delay the use of + * the filter until columns are actually queried. + * The reason for that is get_paged_slice because it change the start of + * the filter after having seen the first row, and so we must not use the + * filter before the row data is actually queried. However, mergeIterator + * needs to "fetch" a row in advance. But all it needs is the key and so + * this IColumnIterator make sure getKey() can be called without triggering + * the use of the filter itself. + */ -public class LazyColumnIterator extends AbstractIterator<IColumn> implements IColumnIterator ++public class LazyColumnIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator + { + private final DecoratedKey key; + private final IColumnIteratorFactory subIteratorFactory; + - private IColumnIterator subIterator; ++ private OnDiskAtomIterator subIterator; + + public LazyColumnIterator(DecoratedKey key, IColumnIteratorFactory subIteratorFactory) + { + this.key = key; + this.subIteratorFactory = subIteratorFactory; + } + - private IColumnIterator getSubIterator() ++ private OnDiskAtomIterator getSubIterator() + { + if (subIterator == null) + subIterator = subIteratorFactory.create(); + return subIterator; + } + - protected IColumn computeNext() ++ protected OnDiskAtom computeNext() + { + getSubIterator(); + return subIterator.hasNext() ? subIterator.next() : endOfData(); + } + + public ColumnFamily getColumnFamily() + { + return getSubIterator().getColumnFamily(); + } + + public DecoratedKey getKey() + { + return key; + } + + public void close() throws IOException + { + if (subIterator != null) + subIterator.close(); + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8cf1259b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/SSTableScanner.java index 6566387,d3bc0b3..22ac485 --- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java @@@ -21,14 -24,17 +21,16 @@@ import java.io.IOException import java.util.Arrays; import java.util.Iterator; - import org.apache.cassandra.db.compaction.ICompactionScanner; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.db.RowPosition; + import org.apache.cassandra.db.columniterator.IColumnIteratorFactory; + import org.apache.cassandra.db.columniterator.LazyColumnIterator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; + import org.apache.cassandra.db.compaction.ICompactionScanner; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.RowPosition; -import org.apache.cassandra.db.columniterator.IColumnIterator; import org.apache.cassandra.db.filter.QueryFilter; - import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.io.util.FileUtils; + import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.ByteBufferUtil; public class SSTableScanner implements ICompactionScanner @@@ -210,33 -173,32 +212,39 @@@ { try { - DecoratedKey currentKey; - RowIndexEntry currentEntry; - if (row != null) - file.seek(finishedAt); - assert !file.isEOF(); - - final DecoratedKey<?> key = SSTableReader.decodeKey(sstable.partitioner, - sstable.descriptor, - ByteBufferUtil.readWithShortLength(file)); - long dataSize = SSTableReader.readRowSize(file, sstable.descriptor); - long dataStart = file.getFilePointer(); - finishedAt = dataStart + dataSize; ++ final DecoratedKey currentKey; ++ final RowIndexEntry currentEntry; - if (filter == null) + if (row == null) { - row = new SSTableIdentityIterator(sstable, file, key, dataStart, dataSize); - return row; + currentKey = sstable.decodeKey(ByteBufferUtil.readWithShortLength(ifile)); + currentEntry = RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor.version); } else { - return row = new LazyColumnIterator(key, new IColumnIteratorFactory() - { - public IColumnIterator create() - { - return filter.getSSTableColumnIterator(sstable, file, key); - } - }); + currentKey = nextKey; + currentEntry = nextEntry; } + + if (ifile.isEOF()) + { + nextKey = null; + nextEntry = null; + } + else + { + nextKey = sstable.decodeKey(ByteBufferUtil.readWithShortLength(ifile)); + nextEntry = RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor.version); + } + + assert !dfile.isEOF(); - return row = filter.getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry); ++ return row = new LazyColumnIterator(currentKey, new IColumnIteratorFactory() ++ { ++ public OnDiskAtomIterator create() ++ { ++ return filter.getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry); ++ } ++ }); } catch (IOException e) {
