Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=777578&r1=777577&r2=777578&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Fri May 22 15:33:42 2009 @@ -42,6 +42,7 @@ import org.apache.cassandra.utils.BloomFilter; import org.apache.cassandra.utils.FileUtils; import org.apache.cassandra.utils.LogUtil; +import org.apache.cassandra.io.SequenceFile.ColumnGroupReader; /** * This class is built on top of the SequenceFile. It stores @@ -873,4 +874,29 @@ return hashtable.remove(cannonicalize(filename)); } } + + + /** + * obtain a BlockReader for the getColumnSlice call. + */ + public ColumnGroupReader getColumnGroupReader(String key, String cfName, + String startColumn, boolean isAscending) throws IOException + { + ColumnGroupReader reader = null; + IFileReader dataReader = SequenceFile.reader(dataFile_); + + try + { + /* Morph key into actual key based on the partition type. */ + String decoratedKey = partitioner_.decorateKey(key); + Coordinate fileCoordinate = getCoordinates(decoratedKey, dataReader, partitioner_); + reader = new ColumnGroupReader(dataFile_, decoratedKey, cfName, startColumn, isAscending, fileCoordinate); + } + finally + { + if (dataReader != null) + dataReader.close(); + } + return reader; + } }
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java?rev=777578&r1=777577&r2=777578&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java Fri May 22 15:33:42 2009 @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Arrays; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.utils.BloomFilter; @@ -532,6 +533,146 @@ } } + + /** + * This is a reader that finds the block for a starting column and returns + * blocks before/after it for each next call. This function assumes that + * the CF is sorted by name and exploits the name index. + */ + public static class ColumnGroupReader extends BufferReader + { + private String key_; + private String cfName_; + private boolean isAscending_; + + private List<IndexHelper.ColumnIndexInfo> columnIndexList_; + private long columnStartPosition_; + private int curRangeIndex_; + private int allColumnsSize_; + private int localDeletionTime_; + private long markedForDeleteAt_; + + ColumnGroupReader(String filename, String key, String cfName, String startColumn, boolean isAscending, Coordinate section) throws IOException + { + super(filename, 128 * 1024); + this.cfName_ = cfName; + this.key_ = key; + this.isAscending_ = isAscending; + init(startColumn, section); + } + + /** + * Build a list of index entries ready for search. + */ + private List<IndexHelper.ColumnIndexInfo> getFullColumnIndexList(List<IndexHelper.ColumnIndexInfo> columnIndexList, int totalNumCols) + { + if (columnIndexList.size() == 0) + { + /* if there is no column index, add an index entry that covers the full space. */ + return Arrays.asList(new IndexHelper.ColumnIndexInfo[]{new IndexHelper.ColumnNameIndexInfo("", 0, totalNumCols)}); + } + + List<IndexHelper.ColumnIndexInfo> fullColIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>(); + int accumulatededCols = 0; + for (IndexHelper.ColumnIndexInfo colPosInfo : columnIndexList) + accumulatededCols += colPosInfo.count(); + int remainingCols = totalNumCols - accumulatededCols; + + fullColIndexList.add(new IndexHelper.ColumnNameIndexInfo("", 0, columnIndexList.get(0).count())); + for (int i = 0; i < columnIndexList.size() - 1; i++) + { + IndexHelper.ColumnNameIndexInfo colPosInfo = (IndexHelper.ColumnNameIndexInfo)columnIndexList.get(i); + fullColIndexList.add(new IndexHelper.ColumnNameIndexInfo(colPosInfo.name(), + colPosInfo.position(), + columnIndexList.get(i + 1).count())); + } + String columnName = ((IndexHelper.ColumnNameIndexInfo)columnIndexList.get(columnIndexList.size() - 1)).name(); + fullColIndexList.add(new IndexHelper.ColumnNameIndexInfo(columnName, + columnIndexList.get(columnIndexList.size() - 1).position(), + remainingCols)); + return fullColIndexList; + } + + private void init(String startColumn, Coordinate section) throws IOException + { + String keyInDisk = null; + if (seekTo(key_, section) >= 0) + keyInDisk = file_.readUTF(); + + if ( keyInDisk != null && keyInDisk.equals(key_)) + { + /* read off the size of this row */ + int dataSize = file_.readInt(); + /* skip the bloomfilter */ + int totalBytesRead = IndexHelper.skipBloomFilter(file_); + /* read off the index flag, it has to be true */ + boolean hasColumnIndexes = file_.readBoolean(); + totalBytesRead += 1; + + /* read the index */ + List<IndexHelper.ColumnIndexInfo> colIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>(); + if (hasColumnIndexes) + totalBytesRead += IndexHelper.deserializeIndex(cfName_, file_, colIndexList); + + /* need to do two things here. + * 1. move the file pointer to the beginning of the list of stored columns + * 2. calculate the size of all columns */ + String cfName = file_.readUTF(); + localDeletionTime_ = file_.readInt(); + markedForDeleteAt_ = file_.readLong(); + int totalNumCols = file_.readInt(); + allColumnsSize_ = dataSize - (totalBytesRead + utfPrefix_ + cfName.length() + 4 + 8 + 4); + + columnStartPosition_ = file_.getFilePointer(); + columnIndexList_ = getFullColumnIndexList(colIndexList, totalNumCols); + + int index = Collections.binarySearch(columnIndexList_, new IndexHelper.ColumnNameIndexInfo(startColumn)); + curRangeIndex_ = index < 0 ? (++index) * (-1) - 1 : index; + } + else + { + /* no keys found in this file because of a false positive in BF */ + curRangeIndex_ = -1; + columnIndexList_ = new ArrayList<IndexHelper.ColumnIndexInfo>(); + } + } + + private boolean getBlockFromCurIndex(DataOutputBuffer bufOut) throws IOException + { + if (curRangeIndex_ < 0 || curRangeIndex_ >= columnIndexList_.size()) + return false; + IndexHelper.ColumnIndexInfo curColPostion = columnIndexList_.get(curRangeIndex_); + long start = curColPostion.position(); + long end = curRangeIndex_ < columnIndexList_.size() - 1 + ? columnIndexList_.get(curRangeIndex_+1).position() + : allColumnsSize_; + + /* seek to the correct offset to the data, and calculate the data size */ + file_.seek(columnStartPosition_ + start); + long dataSize = end - start; + + bufOut.reset(); + // write CF info + bufOut.writeUTF(cfName_); + bufOut.writeInt(localDeletionTime_); + bufOut.writeLong(markedForDeleteAt_); + // now write the columns + bufOut.writeInt(curColPostion.count()); + bufOut.write(file_, (int)dataSize); + return true; + } + + public boolean getNextBlock(DataOutputBuffer outBuf) throws IOException + { + boolean result = getBlockFromCurIndex(outBuf); + if (isAscending_) + curRangeIndex_++; + else + curRangeIndex_--; + return result; + } + } + public static abstract class AbstractReader implements IFileReader { private static final short utfPrefix_ = 2; @@ -705,7 +846,7 @@ * @param section indicates the location of the block index. * @throws IOException */ - private long seekTo(String key, Coordinate section) throws IOException + protected long seekTo(String key, Coordinate section) throws IOException { seek(section.end_); long position = getPositionFromBlockIndex(key); Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=777578&r1=777577&r2=777578&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Fri May 22 15:33:42 2009 @@ -35,6 +35,7 @@ import org.apache.cassandra.db.ColumnsSinceReadCommand; import org.apache.cassandra.db.SliceByNamesReadCommand; import org.apache.cassandra.db.SliceByRangeReadCommand; +import org.apache.cassandra.db.SliceFromReadCommand; import org.apache.cassandra.db.SliceReadCommand; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.Row; @@ -208,7 +209,26 @@ } return thriftifyColumns(columns); } - + + public List<column_t> get_slice_from(String tablename, String key, String columnFamily_column, boolean isAscending, int count) throws InvalidRequestException + { + logger.debug("get_slice_from"); + String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column); + if (values.length != 2 || DatabaseDescriptor.getColumnFamilyType(values[0]) != "Standard") + throw new InvalidRequestException("get_slice_from requires a standard CF name and a starting column name"); + if (count <= 0) + throw new InvalidRequestException("get_slice_from requires positive count"); + if ("Name".compareTo(DatabaseDescriptor.getCFMetaData(tablename, values[0]).indexProperty_) != 0) + throw new InvalidRequestException("get_slice_from requires CF indexed by name"); + ColumnFamily cfamily = readColumnFamily(new SliceFromReadCommand(tablename, key, columnFamily_column, isAscending, count)); + if (cfamily == null) + { + return EMPTY_COLUMNS; + } + Collection<IColumn> columns = cfamily.getAllColumns(); + return thriftifyColumns(columns); + } + public column_t get_column(String tablename, String key, String columnFamily_column) throws NotFoundException, InvalidRequestException { logger.debug("get_column"); Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/ReducingIterator.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/ReducingIterator.java?rev=777578&view=auto ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/ReducingIterator.java (added) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/ReducingIterator.java Fri May 22 15:33:42 2009 @@ -0,0 +1,59 @@ +package org.apache.cassandra.utils; + +import java.util.Iterator; + +import com.google.common.collect.AbstractIterator; + +/** + * reduces equal values from the source iterator to a single (optionally transformed) instance. + */ +public abstract class ReducingIterator<T> extends AbstractIterator<T> implements Iterator<T>, Iterable<T> +{ + protected Iterator<T> source; + protected T last; + + public ReducingIterator(Iterator<T> source) + { + this.source = source; + } + + /** combine this object with the previous ones. intermediate state is up to your implementation. */ + public abstract void reduce(T current); + + /** return the last object computed by reduce */ + protected abstract T getReduced(); + + /** override this if the keys you want to base the reduce on are not the same as the object itself (but can be generated from it) */ + protected Object getKey(T o) + { + return o; + } + + protected T computeNext() + { + if (last == null && !source.hasNext()) + return endOfData(); + + boolean keyChanged = false; + while (!keyChanged) + { + if (last != null) + reduce(last); + if (!source.hasNext()) + { + last = null; + break; + } + T current = source.next(); + if (last != null && !getKey(current).equals(getKey(last))) + keyChanged = true; + last = current; + } + return getReduced(); + } + + public Iterator<T> iterator() + { + return this; + } +} Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=777578&r1=777577&r2=777578&view=diff ============================================================================== --- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java (original) +++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java Fri May 22 15:33:42 2009 @@ -18,6 +18,11 @@ package org.apache.cassandra.db; +import java.util.SortedSet; +import java.util.Arrays; +import java.util.List; +import java.util.ArrayList; + import org.junit.Test; import static junit.framework.Assert.*; @@ -202,4 +207,117 @@ rm.add(cf); return rm; } + + @Test + public void testGetSliceFromBasic() throws Throwable + { + Table table = Table.open(TABLE_NAME); + String ROW = "row1"; + RowMutation rm = new RowMutation(TABLE_NAME, ROW); + ColumnFamily cf = new ColumnFamily("Standard1", "Standard"); + cf.addColumn(new Column("col1", "val1".getBytes(), 1L)); + cf.addColumn(new Column("col3", "val3".getBytes(), 1L)); + cf.addColumn(new Column("col4", "val4".getBytes(), 1L)); + cf.addColumn(new Column("col5", "val5".getBytes(), 1L)); + cf.addColumn(new Column("col7", "val7".getBytes(), 1L)); + cf.addColumn(new Column("col9", "val9".getBytes(), 1L)); + rm.add(cf); + rm.apply(); + + rm = new RowMutation(TABLE_NAME, ROW); + rm.delete("Standard1:col4", 2L); + rm.apply(); + validateGetSliceFromBasic(table, ROW); + + // flush to disk + table.getColumnFamilyStore("Standard1").forceBlockingFlush(); + validateGetSliceFromBasic(table, ROW); + } + + @Test + public void testGetSliceFromAdvanced() throws Throwable + { + Table table = Table.open(TABLE_NAME); + String ROW = "row2"; + RowMutation rm = new RowMutation(TABLE_NAME, ROW); + ColumnFamily cf = new ColumnFamily("Standard1", "Standard"); + cf.addColumn(new Column("col1", "val1".getBytes(), 1L)); + cf.addColumn(new Column("col2", "val2".getBytes(), 1L)); + cf.addColumn(new Column("col3", "val3".getBytes(), 1L)); + cf.addColumn(new Column("col4", "val4".getBytes(), 1L)); + cf.addColumn(new Column("col5", "val5".getBytes(), 1L)); + cf.addColumn(new Column("col6", "val6".getBytes(), 1L)); + rm.add(cf); + rm.apply(); + // flush to disk + table.getColumnFamilyStore("Standard1").forceBlockingFlush(); + + rm = new RowMutation(TABLE_NAME, ROW); + cf = new ColumnFamily("Standard1", "Standard"); + cf.addColumn(new Column("col1", "valx".getBytes(), 2L)); + cf.addColumn(new Column("col2", "valx".getBytes(), 2L)); + cf.addColumn(new Column("col3", "valx".getBytes(), 2L)); + rm.add(cf); + rm.apply(); + validateGetSliceFromAdvanced(table, ROW); + + // flush to disk + table.getColumnFamilyStore("Standard1").forceBlockingFlush(); + validateGetSliceFromAdvanced(table, ROW); + } + + private void assertColumns(ColumnFamily columnFamily, String... columnFamilyNames) + { + assertNotNull(columnFamily); + SortedSet<IColumn> columns = columnFamily.getAllColumns(); + List<String> L = new ArrayList<String>(); + for (IColumn column : columns) + { + L.add(column.name()); + } + assert Arrays.equals(L.toArray(new String[columns.size()]), columnFamilyNames); + } + + private void validateGetSliceFromAdvanced(Table table, String row) throws Throwable + { + Row result; + ColumnFamily cfres; + + result = table.getSliceFrom(row, "Standard1:col2", true, 3); + cfres = result.getColumnFamily("Standard1"); + assertColumns(cfres, "col2", "col3", "col4"); + assertEquals(new String(cfres.getColumn("col2").value()), "valx"); + assertEquals(new String(cfres.getColumn("col3").value()), "valx"); + assertEquals(new String(cfres.getColumn("col4").value()), "val4"); + } + + private void validateGetSliceFromBasic(Table table, String row) throws Throwable + { + Row result; + ColumnFamily cf; + + result = table.getSliceFrom(row, "Standard1:col5", true, 2); + cf = result.getColumnFamily("Standard1"); + assertColumns(cf, "col5", "col7"); + + result = table.getSliceFrom(row, "Standard1:col4", true, 2); + cf = result.getColumnFamily("Standard1"); + assertColumns(cf, "col4", "col5", "col7"); + + result = table.getSliceFrom(row, "Standard1:col5", false, 2); + cf = result.getColumnFamily("Standard1"); + assertColumns(cf, "col3", "col4", "col5"); + + result = table.getSliceFrom(row, "Standard1:col6", false, 2); + cf = result.getColumnFamily("Standard1"); + assertColumns(cf, "col3", "col4", "col5"); + + result = table.getSliceFrom(row, "Standard1:col95", true, 2); + cf = result.getColumnFamily("Standard1"); + assertColumns(cf); + + result = table.getSliceFrom(row, "Standard1:col0", false, 2); + cf = result.getColumnFamily("Standard1"); + assertColumns(cf); + } }
