http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index bed5d39..8fb83af 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -47,19 +47,14 @@ import org.apache.cassandra.cache.*; import org.apache.cassandra.concurrent.*; import org.apache.cassandra.config.*; import org.apache.cassandra.config.CFMetaData.SpeculativeRetry; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.*; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.filter.ColumnSlice; -import org.apache.cassandra.db.filter.ExtendedFilter; -import org.apache.cassandra.db.filter.IDiskAtomFilter; -import org.apache.cassandra.db.filter.QueryFilter; -import org.apache.cassandra.db.filter.SliceQueryFilter; +import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexManager; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.dht.*; import org.apache.cassandra.dht.Range; import org.apache.cassandra.exceptions.ConfigurationException; @@ -76,7 +71,6 @@ import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.StreamLockfile; -import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.concurrent.*; import org.apache.cassandra.utils.TopKSampler.SamplerResult; @@ -592,12 +586,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { if (def.isIndexed()) { - CellNameType indexComparator = SecondaryIndex.getIndexComparator(metadata, def); - if (indexComparator != null) - { - CFMetaData indexMetadata = CFMetaData.newIndexMetadata(metadata, def, indexComparator); + CFMetaData indexMetadata = SecondaryIndex.newIndexMetadata(metadata, def); + if (indexMetadata != null) scrubDataDirectories(indexMetadata); - } } } } @@ -1220,7 +1211,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return; RowCacheKey cacheKey = new RowCacheKey(metadata.cfId, key); - invalidateCachedRow(cacheKey); + invalidateCachedPartition(cacheKey); } /** @@ -1230,11 +1221,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * param @ key - key for update/insert * param @ columnFamily - columnFamily changes */ - public void apply(DecoratedKey key, ColumnFamily columnFamily, SecondaryIndexManager.Updater indexer, OpOrder.Group opGroup, ReplayPosition replayPosition) + public void apply(PartitionUpdate update, SecondaryIndexManager.Updater indexer, OpOrder.Group opGroup, ReplayPosition replayPosition) { long start = System.nanoTime(); Memtable mt = data.getMemtableFor(opGroup, replayPosition); - final long timeDelta = mt.put(key, columnFamily, indexer, opGroup); + long timeDelta = mt.put(update, indexer, opGroup); + DecoratedKey key = update.partitionKey(); maybeUpdateRowCache(key); metric.samplers.get(Sampler.WRITES).addSample(key.getKey(), key.hashCode(), 1); metric.writeLatency.addNano(System.nanoTime() - start); @@ -1243,93 +1235,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } /** - * Purges gc-able top-level and range tombstones, returning `cf` if there are any columns or tombstones left, - * null otherwise. - * @param gcBefore a timestamp (in seconds); tombstones with a localDeletionTime before this will be purged - */ - public static ColumnFamily removeDeletedCF(ColumnFamily cf, int gcBefore) - { - // purge old top-level and range tombstones - cf.purgeTombstones(gcBefore); - - // if there are no columns or tombstones left, return null - return !cf.hasColumns() && !cf.isMarkedForDelete() ? null : cf; - } - - /** - * Removes deleted columns and purges gc-able tombstones. - * @return an updated `cf` if any columns or tombstones remain, null otherwise - */ - public static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore) - { - return removeDeleted(cf, gcBefore, SecondaryIndexManager.nullUpdater); - } - - /* - This is complicated because we need to preserve deleted columns and columnfamilies - until they have been deleted for at least GC_GRACE_IN_SECONDS. But, we do not need to preserve - their contents; just the object itself as a "tombstone" that can be used to repair other - replicas that do not know about the deletion. - */ - public static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer) - { - if (cf == null) - { - return null; - } - - return removeDeletedCF(removeDeletedColumnsOnly(cf, gcBefore, indexer), gcBefore); - } - - /** - * Removes only per-cell tombstones, cells that are shadowed by a row-level or range tombstone, or - * columns that have been dropped from the schema (for CQL3 tables only). - * @return the updated ColumnFamily - */ - public static ColumnFamily removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer) - { - BatchRemoveIterator<Cell> iter = cf.batchRemoveIterator(); - DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester(); - boolean hasDroppedColumns = !cf.metadata.getDroppedColumns().isEmpty(); - while (iter.hasNext()) - { - Cell c = iter.next(); - // remove columns if - // (a) the column itself is gcable or - // (b) the column is shadowed by a CF tombstone - // (c) the column has been dropped from the CF schema (CQL3 tables only) - if (c.getLocalDeletionTime() < gcBefore || tester.isDeleted(c) || (hasDroppedColumns && isDroppedColumn(c, cf.metadata()))) - { - iter.remove(); - indexer.remove(c); - } - } - iter.commit(); - return cf; - } - - // returns true if - // 1. this column has been dropped from schema and - // 2. if it has been re-added since then, this particular column was inserted before the last drop - private static boolean isDroppedColumn(Cell c, CFMetaData meta) - { - Long droppedAt = meta.getDroppedColumns().get(c.name().cql3ColumnName(meta)); - return droppedAt != null && c.timestamp() <= droppedAt; - } - - private void removeDroppedColumns(ColumnFamily cf) - { - if (cf == null || cf.metadata.getDroppedColumns().isEmpty()) - return; - - BatchRemoveIterator<Cell> iter = cf.batchRemoveIterator(); - while (iter.hasNext()) - if (isDroppedColumn(iter.next(), metadata)) - iter.remove(); - iter.commit(); - } - - /** * @param sstables * @return sstables whose key range overlaps with that of the given sstables, not including itself. * (The given sstables may or may not overlap with each other.) @@ -1348,7 +1253,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean Set<SSTableReader> results = null; for (SSTableReader sstable : sstables) { - Set<SSTableReader> overlaps = ImmutableSet.copyOf(tree.search(Interval.<RowPosition, SSTableReader>create(sstable.first, sstable.last))); + Set<SSTableReader> overlaps = ImmutableSet.copyOf(tree.search(Interval.<PartitionPosition, SSTableReader>create(sstable.first, sstable.last))); results = results == null ? overlaps : Sets.union(results, overlaps).immutableCopy(); } results = Sets.difference(results, ImmutableSet.copyOf(sstables)); @@ -1532,9 +1437,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return valid; } - - - /** * Package protected for access from the CompactionManager. */ @@ -1553,249 +1455,30 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return data.getUncompacting(); } - public ColumnFamily getColumnFamily(DecoratedKey key, - Composite start, - Composite finish, - boolean reversed, - int limit, - long timestamp) - { - return getColumnFamily(QueryFilter.getSliceFilter(key, name, start, finish, reversed, limit, timestamp)); - } - - /** - * Fetch the row and columns given by filter.key if it is in the cache; if not, read it from disk and cache it - * - * If row is cached, and the filter given is within its bounds, we return from cache, otherwise from disk - * - * If row is not cached, we figure out what filter is "biggest", read that from disk, then - * filter the result and either cache that or return it. - * - * @param cfId the column family to read the row from - * @param filter the columns being queried. - * @return the requested data for the filter provided - */ - private ColumnFamily getThroughCache(UUID cfId, QueryFilter filter) - { - assert isRowCacheEnabled() - : String.format("Row cache is not enabled on table [" + name + "]"); - - RowCacheKey key = new RowCacheKey(cfId, filter.key); - - // attempt a sentinel-read-cache sequence. if a write invalidates our sentinel, we'll return our - // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862 - // TODO: don't evict entire rows on writes (#2864) - IRowCacheEntry cached = CacheService.instance.rowCache.get(key); - if (cached != null) - { - if (cached instanceof RowCacheSentinel) - { - // Some other read is trying to cache the value, just do a normal non-caching read - Tracing.trace("Row cache miss (race)"); - metric.rowCacheMiss.inc(); - return getTopLevelColumns(filter, Integer.MIN_VALUE); - } - - ColumnFamily cachedCf = (ColumnFamily)cached; - if (isFilterFullyCoveredBy(filter.filter, cachedCf, filter.timestamp)) - { - metric.rowCacheHit.inc(); - Tracing.trace("Row cache hit"); - return filterColumnFamily(cachedCf, filter); - } - - metric.rowCacheHitOutOfRange.inc(); - Tracing.trace("Ignoring row cache as cached value could not satisfy query"); - return getTopLevelColumns(filter, Integer.MIN_VALUE); - } - - metric.rowCacheMiss.inc(); - Tracing.trace("Row cache miss"); - RowCacheSentinel sentinel = new RowCacheSentinel(); - boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel); - ColumnFamily data = null; - ColumnFamily toCache = null; - try - { - // If we are explicitely asked to fill the cache with full partitions, we go ahead and query the whole thing - if (metadata.getCaching().rowCache.cacheFullPartitions()) - { - data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key, name, filter.timestamp), Integer.MIN_VALUE); - toCache = data; - Tracing.trace("Populating row cache with the whole partition"); - if (sentinelSuccess && toCache != null) - CacheService.instance.rowCache.replace(key, sentinel, toCache); - return filterColumnFamily(data, filter); - } - - // Otherwise, if we want to cache the result of the query we're about to do, we must make sure this query - // covers what needs to be cached. And if the user filter does not satisfy that, we sometimes extend said - // filter so we can populate the cache but only if: - // 1) we can guarantee it is a strict extension, i.e. that we will still fetch the data asked by the user. - // 2) the extension does not make us query more than getRowsPerPartitionToCache() (as a mean to limit the - // amount of extra work we'll do on a user query for the purpose of populating the cache). - // - // In practice, we can only guarantee those 2 points if the filter is one that queries the head of the - // partition (and if that filter actually counts CQL3 rows since that's what we cache and it would be - // bogus to compare the filter count to the 'rows to cache' otherwise). - if (filter.filter.isHeadFilter() && filter.filter.countCQL3Rows(metadata.comparator)) - { - SliceQueryFilter sliceFilter = (SliceQueryFilter)filter.filter; - int rowsToCache = metadata.getCaching().rowCache.rowsToCache; - - SliceQueryFilter cacheSlice = readFilterForCache(); - QueryFilter cacheFilter = new QueryFilter(filter.key, name, cacheSlice, filter.timestamp); - - // If the filter count is less than the number of rows cached, we simply extend it to make sure we do cover the - // number of rows to cache, and if that count is greater than the number of rows to cache, we simply filter what - // needs to be cached afterwards. - if (sliceFilter.count < rowsToCache) - { - toCache = getTopLevelColumns(cacheFilter, Integer.MIN_VALUE); - if (toCache != null) - { - Tracing.trace("Populating row cache ({} rows cached)", cacheSlice.lastCounted()); - data = filterColumnFamily(toCache, filter); - } - } - else - { - data = getTopLevelColumns(filter, Integer.MIN_VALUE); - if (data != null) - { - // The filter limit was greater than the number of rows to cache. But, if the filter had a non-empty - // finish bound, we may have gotten less than what needs to be cached, in which case we shouldn't cache it - // (otherwise a cache hit would assume the whole partition is cached which is not the case). - if (sliceFilter.finish().isEmpty() || sliceFilter.lastCounted() >= rowsToCache) - { - toCache = filterColumnFamily(data, cacheFilter); - Tracing.trace("Caching {} rows (out of {} requested)", cacheSlice.lastCounted(), sliceFilter.count); - } - else - { - Tracing.trace("Not populating row cache, not enough rows fetched ({} fetched but {} required for the cache)", sliceFilter.lastCounted(), rowsToCache); - } - } - } - - if (sentinelSuccess && toCache != null) - CacheService.instance.rowCache.replace(key, sentinel, toCache); - return data; - } - else - { - Tracing.trace("Fetching data but not populating cache as query does not query from the start of the partition"); - return getTopLevelColumns(filter, Integer.MIN_VALUE); - } - } - finally - { - if (sentinelSuccess && toCache == null) - invalidateCachedRow(key); - } - } - - public SliceQueryFilter readFilterForCache() - { - // We create a new filter everytime before for now SliceQueryFilter is unfortunatly mutable. - return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, metadata.getCaching().rowCache.rowsToCache, metadata.clusteringColumns().size()); - } - - public boolean isFilterFullyCoveredBy(IDiskAtomFilter filter, ColumnFamily cachedCf, long now) + public boolean isFilterFullyCoveredBy(ClusteringIndexFilter filter, DataLimits limits, CachedPartition cached, int nowInSec) { // We can use the cached value only if we know that no data it doesn't contain could be covered // by the query filter, that is if: // 1) either the whole partition is cached - // 2) or we can ensure than any data the filter selects are in the cached partition - - // When counting rows to decide if the whole row is cached, we should be careful with expiring - // columns: if we use a timestamp newer than the one that was used when populating the cache, we might - // end up deciding the whole partition is cached when it's really not (just some rows expired since the - // cf was cached). This is the reason for Integer.MIN_VALUE below. - boolean wholePartitionCached = cachedCf.liveCQL3RowCount(Integer.MIN_VALUE) < metadata.getCaching().rowCache.rowsToCache; - - // Contrarily to the "wholePartitionCached" check above, we do want isFullyCoveredBy to take the - // timestamp of the query into account when dealing with expired columns. Otherwise, we could think - // the cached partition has enough live rows to satisfy the filter when it doesn't because some - // are now expired. - return wholePartitionCached || filter.isFullyCoveredBy(cachedCf, now); - } - - public int gcBefore(long now) - { - return (int) (now / 1000) - metadata.getGcGraceSeconds(); - } - - /** - * get a list of columns starting from a given column, in a specified order. - * only the latest version of a column is returned. - * @return null if there is no data and no tombstones; otherwise a ColumnFamily - */ - public ColumnFamily getColumnFamily(QueryFilter filter) - { - assert name.equals(filter.getColumnFamilyName()) : filter.getColumnFamilyName(); - - ColumnFamily result = null; - - long start = System.nanoTime(); - try - { - int gcBefore = gcBefore(filter.timestamp); - if (isRowCacheEnabled()) - { - assert !isIndex(); // CASSANDRA-5732 - UUID cfId = metadata.cfId; - - ColumnFamily cached = getThroughCache(cfId, filter); - if (cached == null) - { - logger.trace("cached row is empty"); - return null; - } - - result = cached; - } - else - { - ColumnFamily cf = getTopLevelColumns(filter, gcBefore); + // 2) or we can ensure than any data the filter selects is in the cached partition - if (cf == null) - return null; - - result = removeDeletedCF(cf, gcBefore); - } - - removeDroppedColumns(result); - - if (filter.filter instanceof SliceQueryFilter) - { - // Log the number of tombstones scanned on single key queries - metric.tombstoneScannedHistogram.update(((SliceQueryFilter) filter.filter).lastTombstones()); - metric.liveScannedHistogram.update(((SliceQueryFilter) filter.filter).lastLive()); - } - } - finally - { - metric.readLatency.addNano(System.nanoTime() - start); - } + // We can guarantee that a partition is fully cached if the number of rows it contains is less than + // what we're caching. Wen doing that, we should be careful about expiring cells: we should count + // something expired that wasn't when the partition was cached, or we could decide that the whole + // partition is cached when it's not. This is why we use CachedPartition#cachedLiveRows. + if (cached.cachedLiveRows() < metadata.getCaching().rowCache.rowsToCache) + return true; - return result; + // If the whole partition isn't cached, then we must guarantee that the filter cannot select data that + // is not in the cache. We can guarantee that if either the filter is a "head filter" and the cached + // partition has more live rows that queried (where live rows refers to the rows that are live now), + // or if we can prove that everything the filter selects is in the cached partition based on its content. + return (filter.isHeadFilter() && limits.hasEnoughLiveData(cached, nowInSec)) || filter.isFullyCoveredBy(cached); } - /** - * Filter a cached row, which will not be modified by the filter, but may be modified by throwing out - * tombstones that are no longer relevant. - * The returned column family won't be thread safe. - */ - ColumnFamily filterColumnFamily(ColumnFamily cached, QueryFilter filter) + public int gcBefore(int nowInSec) { - if (cached == null) - return null; - - ColumnFamily cf = cached.cloneMeShallow(ArrayBackedSortedColumns.factory, filter.filter.isReversed()); - int gcBefore = gcBefore(filter.timestamp); - filter.collateOnDiskAtom(cf, filter.getIterator(cached), gcBefore); - return removeDeletedCF(cf, gcBefore); + return nowInSec - metadata.getGcGraceSeconds(); } public Set<SSTableReader> getUnrepairedSSTables() @@ -1881,7 +1564,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * @return a ViewFragment containing the sstables and memtables that may need to be merged * for rows within @param rowBounds, inclusive, according to the interval tree. */ - public Function<View, List<SSTableReader>> viewFilter(final AbstractBounds<RowPosition> rowBounds) + public Function<View, List<SSTableReader>> viewFilter(final AbstractBounds<PartitionPosition> rowBounds) { return new Function<View, List<SSTableReader>>() { @@ -1896,14 +1579,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * @return a ViewFragment containing the sstables and memtables that may need to be merged * for rows for all of @param rowBoundsCollection, inclusive, according to the interval tree. */ - public Function<View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection, final boolean includeRepaired) + public Function<View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<PartitionPosition>> rowBoundsCollection, final boolean includeRepaired) { return new Function<View, List<SSTableReader>>() { public List<SSTableReader> apply(View view) { Set<SSTableReader> sstables = Sets.newHashSet(); - for (AbstractBounds<RowPosition> rowBounds : rowBoundsCollection) + for (AbstractBounds<PartitionPosition> rowBounds : rowBoundsCollection) { for (SSTableReader sstable : view.sstablesInBounds(rowBounds)) { @@ -1934,20 +1617,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } - public ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore) - { - Tracing.trace("Executing single-partition query on {}", name); - CollationController controller = new CollationController(this, filter, gcBefore); - ColumnFamily columns; - try (OpOrder.Group op = readOrdering.start()) - { - columns = controller.getTopLevelColumns(Memtable.MEMORY_POOL.needToCopyOnHeap()); - } - if (columns != null) - metric.samplers.get(Sampler.READS).addSample(filter.key.getKey(), filter.key.hashCode(), 1); - metric.updateSSTableIterated(controller.getSstablesIterated()); - return columns; - } public void beginLocalSampling(String sampler, int capacity) { @@ -1982,7 +1651,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean RowCacheKey key = keyIter.next(); DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key)); if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges)) - invalidateCachedRow(dk); + invalidateCachedPartition(dk); } if (metadata.isCounter()) @@ -1998,247 +1667,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } - public static abstract class AbstractScanIterator extends AbstractIterator<Row> implements CloseableIterator<Row> - { - public boolean needsFiltering() - { - return true; - } - } - - /** - * Iterate over a range of rows and columns from memtables/sstables. - * - * @param range The range of keys and columns within those keys to fetch - */ - @SuppressWarnings("resource") - private AbstractScanIterator getSequentialIterator(final DataRange range, long now) - { - assert !(range.keyRange() instanceof Range) || !((Range<?>)range.keyRange()).isWrapAround() || range.keyRange().right.isMinimum() : range.keyRange(); - - final ViewFragment view = select(viewFilter(range.keyRange())); - Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), range.keyRange().getString(metadata.getKeyValidator())); - - final CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, range, this, now); - - // todo this could be pushed into SSTableScanner - return new AbstractScanIterator() - { - protected Row computeNext() - { - while (true) - { - // pull a row out of the iterator - if (!iterator.hasNext()) - return endOfData(); - - Row current = iterator.next(); - DecoratedKey key = current.key; - - if (!range.stopKey().isMinimum() && range.stopKey().compareTo(key) < 0) - return endOfData(); - - // skipping outside of assigned range - if (!range.contains(key)) - continue; - - if (logger.isTraceEnabled()) - logger.trace("scanned {}", metadata.getKeyValidator().getString(key.getKey())); - - return current; - } - } - - public void close() throws IOException - { - iterator.close(); - } - }; - } - - @VisibleForTesting - public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range, - List<IndexExpression> rowFilter, - IDiskAtomFilter columnFilter, - int maxResults) - { - return getRangeSlice(range, rowFilter, columnFilter, maxResults, System.currentTimeMillis()); - } - - public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range, - List<IndexExpression> rowFilter, - IDiskAtomFilter columnFilter, - int maxResults, - long now) - { - return getRangeSlice(makeExtendedFilter(range, columnFilter, rowFilter, maxResults, false, false, now)); - } - - /** - * Allows generic range paging with the slice column filter. - * Typically, suppose we have rows A, B, C ... Z having each some columns in [1, 100]. - * And suppose we want to page through the query that for all rows returns the columns - * within [25, 75]. For that, we need to be able to do a range slice starting at (row r, column c) - * and ending at (row Z, column 75), *but* that only return columns in [25, 75]. - * That is what this method allows. The columnRange is the "window" of columns we are interested - * in each row, and columnStart (resp. columnEnd) is the start (resp. end) for the first - * (resp. last) requested row. - */ - public ExtendedFilter makeExtendedFilter(AbstractBounds<RowPosition> keyRange, - SliceQueryFilter columnRange, - Composite columnStart, - Composite columnStop, - List<IndexExpression> rowFilter, - int maxResults, - boolean countCQL3Rows, - long now) - { - DataRange dataRange = new DataRange.Paging(keyRange, columnRange, columnStart, columnStop, metadata); - return ExtendedFilter.create(this, dataRange, rowFilter, maxResults, countCQL3Rows, now); - } - - public List<Row> getRangeSlice(AbstractBounds<RowPosition> range, - List<IndexExpression> rowFilter, - IDiskAtomFilter columnFilter, - int maxResults, - long now, - boolean countCQL3Rows, - boolean isPaging) - { - return getRangeSlice(makeExtendedFilter(range, columnFilter, rowFilter, maxResults, countCQL3Rows, isPaging, now)); - } - - public ExtendedFilter makeExtendedFilter(AbstractBounds<RowPosition> range, - IDiskAtomFilter columnFilter, - List<IndexExpression> rowFilter, - int maxResults, - boolean countCQL3Rows, - boolean isPaging, - long timestamp) - { - DataRange dataRange; - if (isPaging) - { - assert columnFilter instanceof SliceQueryFilter; - SliceQueryFilter sfilter = (SliceQueryFilter)columnFilter; - assert sfilter.slices.length == 1; - // create a new SliceQueryFilter that selects all cells, but pass the original slice start and finish - // through to DataRange.Paging to be used on the first and last partitions - SliceQueryFilter newFilter = new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, sfilter.isReversed(), sfilter.count); - dataRange = new DataRange.Paging(range, newFilter, sfilter.start(), sfilter.finish(), metadata); - } - else - { - dataRange = new DataRange(range, columnFilter); - } - return ExtendedFilter.create(this, dataRange, rowFilter, maxResults, countCQL3Rows, timestamp); - } - - public List<Row> getRangeSlice(ExtendedFilter filter) - { - long start = System.nanoTime(); - try (OpOrder.Group op = readOrdering.start()) - { - return filter(getSequentialIterator(filter.dataRange, filter.timestamp), filter); - } - finally - { - metric.rangeLatency.addNano(System.nanoTime() - start); - } - } - - @VisibleForTesting - public List<Row> search(AbstractBounds<RowPosition> range, - List<IndexExpression> clause, - IDiskAtomFilter dataFilter, - int maxResults) - { - return search(range, clause, dataFilter, maxResults, System.currentTimeMillis()); - } - - public List<Row> search(AbstractBounds<RowPosition> range, - List<IndexExpression> clause, - IDiskAtomFilter dataFilter, - int maxResults, - long now) - { - return search(makeExtendedFilter(range, dataFilter, clause, maxResults, false, false, now)); - } - - public List<Row> search(ExtendedFilter filter) - { - Tracing.trace("Executing indexed scan for {}", filter.dataRange.keyRange().getString(metadata.getKeyValidator())); - return indexManager.search(filter); - } - - public List<Row> filter(AbstractScanIterator rowIterator, ExtendedFilter filter) - { - logger.trace("Filtering {} for rows matching {}", rowIterator, filter); - List<Row> rows = new ArrayList<Row>(); - int columnsCount = 0; - int total = 0, matched = 0; - boolean ignoreTombstonedPartitions = filter.ignoreTombstonedPartitions(); - - try - { - while (rowIterator.hasNext() && matched < filter.maxRows() && columnsCount < filter.maxColumns()) - { - // get the raw columns requested, and additional columns for the expressions if necessary - Row rawRow = rowIterator.next(); - total++; - ColumnFamily data = rawRow.cf; - - if (rowIterator.needsFiltering()) - { - IDiskAtomFilter extraFilter = filter.getExtraFilter(rawRow.key, data); - if (extraFilter != null) - { - ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, name, extraFilter, filter.timestamp)); - if (cf != null) - data.addAll(cf); - } - - removeDroppedColumns(data); - - if (!filter.isSatisfiedBy(rawRow.key, data, null, null)) - continue; - - logger.trace("{} satisfies all filter expressions", data); - // cut the resultset back to what was requested, if necessary - data = filter.prune(rawRow.key, data); - } - else - { - removeDroppedColumns(data); - } - - rows.add(new Row(rawRow.key, data)); - if (!ignoreTombstonedPartitions || !data.hasOnlyTombstones(filter.timestamp)) - matched++; - - if (data != null) - columnsCount += filter.lastCounted(data); - // Update the underlying filter to avoid querying more columns per slice than necessary and to handle paging - filter.updateFilter(columnsCount); - } - - return rows; - } - finally - { - try - { - rowIterator.close(); - Tracing.trace("Scanned {} rows and matched {}", total, matched); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - } - - public CellNameType getComparator() + public ClusteringComparator getComparator() { return metadata.comparator; } @@ -2388,20 +1817,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } /** - * @return the cached row for @param key if it is already present in the cache. - * That is, unlike getThroughCache, it will not readAndCache the row if it is not present, nor + * @return the cached partition for @param key if it is already present in the cache. + * Not that this will not readAndCache the parition if it is not present, nor * are these calls counted in cache statistics. * - * Note that this WILL cause deserialization of a SerializingCache row, so if all you - * need to know is whether a row is present or not, use containsCachedRow instead. + * Note that this WILL cause deserialization of a SerializingCache partition, so if all you + * need to know is whether a partition is present or not, use containsCachedParition instead. */ - public ColumnFamily getRawCachedRow(DecoratedKey key) + public CachedPartition getRawCachedPartition(DecoratedKey key) { if (!isRowCacheEnabled()) return null; IRowCacheEntry cached = CacheService.instance.rowCache.getInternal(new RowCacheKey(metadata.cfId, key)); - return cached == null || cached instanceof RowCacheSentinel ? null : (ColumnFamily)cached; + return cached == null || cached instanceof RowCacheSentinel ? null : (CachedPartition)cached; } private void invalidateCaches() @@ -2415,37 +1844,37 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean /** * @return true if @param key is contained in the row cache */ - public boolean containsCachedRow(DecoratedKey key) + public boolean containsCachedParition(DecoratedKey key) { return CacheService.instance.rowCache.getCapacity() != 0 && CacheService.instance.rowCache.containsKey(new RowCacheKey(metadata.cfId, key)); } - public void invalidateCachedRow(RowCacheKey key) + public void invalidateCachedPartition(RowCacheKey key) { CacheService.instance.rowCache.remove(key); } - public void invalidateCachedRow(DecoratedKey key) + public void invalidateCachedPartition(DecoratedKey key) { UUID cfId = Schema.instance.getId(keyspace.getName(), this.name); if (cfId == null) return; // secondary index - invalidateCachedRow(new RowCacheKey(cfId, key)); + invalidateCachedPartition(new RowCacheKey(cfId, key)); } - public ClockAndCount getCachedCounter(ByteBuffer partitionKey, CellName cellName) + public ClockAndCount getCachedCounter(ByteBuffer partitionKey, Clustering clustering, ColumnDefinition column, CellPath path) { if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled. return null; - return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.cfId, partitionKey, cellName)); + return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.cfId, partitionKey, clustering, column, path)); } - public void putCachedCounter(ByteBuffer partitionKey, CellName cellName, ClockAndCount clockAndCount) + public void putCachedCounter(ByteBuffer partitionKey, Clustering clustering, ColumnDefinition column, CellPath path, ClockAndCount clockAndCount) { if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled. return; - CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.cfId, partitionKey, cellName), clockAndCount); + CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.cfId, partitionKey, clustering, column, path), clockAndCount); } public void forceMajorCompaction() throws InterruptedException, ExecutionException @@ -2830,7 +2259,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.liveMemtables.size() <= 1 && view.flushingMemtables.size() == 0; } - private boolean isRowCacheEnabled() + public boolean isRowCacheEnabled() { return metadata.getCaching().rowCache.isEnabled() && CacheService.instance.rowCache.getCapacity() > 0; }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ColumnFamilyType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyType.java b/src/java/org/apache/cassandra/db/ColumnFamilyType.java deleted file mode 100644 index 51e8b63..0000000 --- a/src/java/org/apache/cassandra/db/ColumnFamilyType.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -/** - * column family type enum - */ -public enum ColumnFamilyType -{ - Standard, - Super; - - public static ColumnFamilyType create(String name) - { - try - { - // TODO thrift optional parameter in CfDef is leaking down here which it shouldn't - return name == null ? ColumnFamilyType.Standard : ColumnFamilyType.valueOf(name); - } - catch (IllegalArgumentException e) - { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java index d9d6a9c..1a9b92d 100644 --- a/src/java/org/apache/cassandra/db/ColumnIndex.java +++ b/src/java/org/apache/cassandra/db/ColumnIndex.java @@ -18,15 +18,15 @@ package org.apache.cassandra.db; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.*; import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.composites.Composite; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.sstable.IndexHelper; -import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.util.SequentialWriter; import org.apache.cassandra.utils.ByteBufferUtil; public class ColumnIndex @@ -42,6 +42,14 @@ public class ColumnIndex this.columnsIndex = columnsIndex; } + public static ColumnIndex writeAndBuildIndex(UnfilteredRowIterator iterator, SequentialWriter output, SerializationHeader header, Version version) throws IOException + { + assert !iterator.isEmpty() && version.storeRows(); + + Builder builder = new Builder(iterator, output, header, version.correspondingMessagingVersion()); + return builder.build(); + } + @VisibleForTesting public static ColumnIndex nothing() { @@ -52,192 +60,114 @@ public class ColumnIndex * Help to create an index for a column family based on size of columns, * and write said columns to disk. */ - public static class Builder + private static class Builder { + private final UnfilteredRowIterator iterator; + private final SequentialWriter writer; + private final SerializationHeader header; + private final int version; + private final ColumnIndex result; - private final long indexOffset; + private final long initialPosition; private long startPosition = -1; - private long endPosition = 0; - private long blockSize; - private OnDiskAtom firstColumn; - private OnDiskAtom lastColumn; - private OnDiskAtom lastBlockClosing; - private final DataOutputPlus output; - private final RangeTombstone.Tracker tombstoneTracker; - private int atomCount; - private final ByteBuffer key; - private final DeletionInfo deletionInfo; // only used for serializing and calculating row header size - - private final OnDiskAtom.Serializer atomSerializer; - - public Builder(ColumnFamily cf, - ByteBuffer key, - DataOutputPlus output) - { - assert cf != null; - assert key != null; - assert output != null; - this.key = key; - deletionInfo = cf.deletionInfo(); - this.indexOffset = rowHeaderSize(key, deletionInfo); - this.result = new ColumnIndex(new ArrayList<IndexHelper.IndexInfo>()); - this.output = output; - this.tombstoneTracker = new RangeTombstone.Tracker(cf.getComparator()); - this.atomSerializer = cf.getComparator().onDiskAtomSerializer(); - } + private int written; - /** - * Returns the number of bytes between the beginning of the row and the - * first serialized column. - */ - private static long rowHeaderSize(ByteBuffer key, DeletionInfo delInfo) - { - TypeSizes typeSizes = TypeSizes.NATIVE; - // TODO fix constantSize when changing the nativeconststs. - int keysize = key.remaining(); - return typeSizes.sizeof((short) keysize) + keysize // Row key - + DeletionTime.serializer.serializedSize(delInfo.getTopLevelDeletion(), typeSizes); - } + private ClusteringPrefix firstClustering; + private final ReusableClusteringPrefix lastClustering; + + private DeletionTime openMarker; - public RangeTombstone.Tracker tombstoneTracker() + public Builder(UnfilteredRowIterator iterator, + SequentialWriter writer, + SerializationHeader header, + int version) { - return tombstoneTracker; + this.iterator = iterator; + this.writer = writer; + this.header = header; + this.version = version; + + this.result = new ColumnIndex(new ArrayList<IndexHelper.IndexInfo>()); + this.initialPosition = writer.getFilePointer(); + this.lastClustering = new ReusableClusteringPrefix(iterator.metadata().clusteringColumns().size()); } - public int writtenAtomCount() + private void writePartitionHeader(UnfilteredRowIterator iterator) throws IOException { - return atomCount + tombstoneTracker.writtenAtom(); + ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer.stream); + DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer.stream); + if (header.hasStatic()) + UnfilteredSerializer.serializer.serialize(iterator.staticRow(), header, writer.stream, version); } - /** - * Serializes the index into in-memory structure with all required components - * such as Bloom Filter, index block size, IndexInfo list - * - * @param cf Column family to create index for - * - * @return information about index - it's Bloom Filter, block size and IndexInfo list - */ - public ColumnIndex build(ColumnFamily cf) throws IOException + public ColumnIndex build() throws IOException { - // cf has disentangled the columns and range tombstones, we need to re-interleave them in comparator order - Comparator<Composite> comparator = cf.getComparator(); - DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(); - Iterator<RangeTombstone> rangeIter = cf.deletionInfo().rangeIterator(); - RangeTombstone tombstone = rangeIter.hasNext() ? rangeIter.next() : null; - - for (Cell c : cf) - { - while (tombstone != null && comparator.compare(c.name(), tombstone.min) >= 0) - { - // skip range tombstones that are shadowed by partition tombstones - if (!cf.deletionInfo().getTopLevelDeletion().isDeleted(tombstone)) - add(tombstone); - tombstone = rangeIter.hasNext() ? rangeIter.next() : null; - } - - // We can skip any cell if it's shadowed by a tombstone already. This is a more - // general case than was handled by CASSANDRA-2589. - if (!tester.isDeleted(c)) - add(c); - } - - while (tombstone != null) - { - add(tombstone); - tombstone = rangeIter.hasNext() ? rangeIter.next() : null; - } - ColumnIndex index = build(); + writePartitionHeader(iterator); - maybeWriteEmptyRowHeader(); + while (iterator.hasNext()) + add(iterator.next()); - return index; + return close(); } - /** - * The important distinction wrt build() is that we may be building for a row that ends up - * being compacted away entirely, i.e., the input consists only of expired tombstones (or - * columns shadowed by expired tombstone). Thus, it is the caller's responsibility - * to decide whether to write the header for an empty row. - */ - public ColumnIndex buildForCompaction(Iterator<OnDiskAtom> columns) throws IOException + private long currentPosition() { - while (columns.hasNext()) - { - OnDiskAtom c = columns.next(); - add(c); - } + return writer.getFilePointer() - initialPosition; + } - return build(); + private void addIndexBlock() + { + IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstClustering, + lastClustering.get().takeAlias(), + startPosition, + currentPosition() - startPosition, + openMarker); + result.columnsIndex.add(cIndexInfo); + firstClustering = null; } - public void add(OnDiskAtom column) throws IOException + private void add(Unfiltered unfiltered) throws IOException { - atomCount++; + lastClustering.copy(unfiltered.clustering()); + boolean isMarker = unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER; - if (firstColumn == null) + if (firstClustering == null) { - firstColumn = column; - startPosition = endPosition; - // TODO: have that use the firstColumn as min + make sure we optimize that on read - endPosition += tombstoneTracker.writeOpenedMarker(firstColumn, output, atomSerializer); - blockSize = 0; // We don't count repeated tombstone marker in the block size, to avoid a situation - // where we wouldn't make any progress because a block is filled by said marker + // Beginning of an index block. Remember the start and position + firstClustering = lastClustering.get().takeAlias(); + startPosition = currentPosition(); } - long size = atomSerializer.serializedSizeForSSTable(column); - endPosition += size; - blockSize += size; + UnfilteredSerializer.serializer.serialize(unfiltered, header, writer.stream, version); + ++written; - // if we hit the column index size that we have to index after, go ahead and index it. - if (blockSize >= DatabaseDescriptor.getColumnIndexSize()) + if (isMarker) { - IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstColumn.name(), column.name(), indexOffset + startPosition, endPosition - startPosition); - result.columnsIndex.add(cIndexInfo); - firstColumn = null; - lastBlockClosing = column; + RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered; + openMarker = marker.isOpen(false) ? marker.openDeletionTime(false) : null; } - maybeWriteRowHeader(); - atomSerializer.serializeForSSTable(column, output); - - // TODO: Should deal with removing unneeded tombstones - tombstoneTracker.update(column, false); - - lastColumn = column; + // if we hit the column index size that we have to index after, go ahead and index it. + if (currentPosition() - startPosition >= DatabaseDescriptor.getColumnIndexSize()) + addIndexBlock(); } - private void maybeWriteRowHeader() throws IOException + private ColumnIndex close() throws IOException { - if (lastColumn == null) - { - ByteBufferUtil.writeWithShortLength(key, output); - DeletionTime.serializer.serialize(deletionInfo.getTopLevelDeletion(), output); - } - } + UnfilteredSerializer.serializer.writeEndOfPartition(writer.stream); - public ColumnIndex build() - { - // all columns were GC'd after all - if (lastColumn == null) + // It's possible we add no rows, just a top level deletion + if (written == 0) return ColumnIndex.EMPTY; // the last column may have fallen on an index boundary already. if not, index it explicitly. - if (result.columnsIndex.isEmpty() || lastBlockClosing != lastColumn) - { - IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstColumn.name(), lastColumn.name(), indexOffset + startPosition, endPosition - startPosition); - result.columnsIndex.add(cIndexInfo); - } + if (firstClustering != null) + addIndexBlock(); // we should always have at least one computed index block, but we only write it out if there is more than that. assert result.columnsIndex.size() > 0; return result; } - - public void maybeWriteEmptyRowHeader() throws IOException - { - if (!deletionInfo.isLive()) - maybeWriteRowHeader(); - } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ColumnSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnSerializer.java b/src/java/org/apache/cassandra/db/ColumnSerializer.java deleted file mode 100644 index 8e7026c..0000000 --- a/src/java/org/apache/cassandra/db/ColumnSerializer.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.DataInput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.io.ISerializer; -import org.apache.cassandra.io.FSReadError; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.utils.ByteBufferUtil; - -public class ColumnSerializer implements ISerializer<Cell> -{ - public final static int DELETION_MASK = 0x01; - public final static int EXPIRATION_MASK = 0x02; - public final static int COUNTER_MASK = 0x04; - public final static int COUNTER_UPDATE_MASK = 0x08; - public final static int RANGE_TOMBSTONE_MASK = 0x10; - - /** - * Flag affecting deserialization behavior. - * - LOCAL: for deserialization of local data (Expired columns are - * converted to tombstones (to gain disk space)). - * - FROM_REMOTE: for deserialization of data received from remote hosts - * (Expired columns are converted to tombstone and counters have - * their delta cleared) - * - PRESERVE_SIZE: used when no transformation must be performed, i.e, - * when we must ensure that deserializing and reserializing the - * result yield the exact same bytes. Streaming uses this. - */ - public static enum Flag - { - LOCAL, FROM_REMOTE, PRESERVE_SIZE; - } - - private final CellNameType type; - - public ColumnSerializer(CellNameType type) - { - this.type = type; - } - - public void serialize(Cell cell, DataOutputPlus out) throws IOException - { - assert !cell.name().isEmpty(); - type.cellSerializer().serialize(cell.name(), out); - try - { - out.writeByte(cell.serializationFlags()); - if (cell instanceof CounterCell) - { - out.writeLong(((CounterCell) cell).timestampOfLastDelete()); - } - else if (cell instanceof ExpiringCell) - { - out.writeInt(((ExpiringCell) cell).getTimeToLive()); - out.writeInt(cell.getLocalDeletionTime()); - } - out.writeLong(cell.timestamp()); - ByteBufferUtil.writeWithLength(cell.value(), out); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - - public Cell deserialize(DataInput in) throws IOException - { - return deserialize(in, Flag.LOCAL); - } - - /* - * For counter columns, we must know when we deserialize them if what we - * deserialize comes from a remote host. If it does, then we must clear - * the delta. - */ - public Cell deserialize(DataInput in, ColumnSerializer.Flag flag) throws IOException - { - return deserialize(in, flag, Integer.MIN_VALUE); - } - - public Cell deserialize(DataInput in, ColumnSerializer.Flag flag, int expireBefore) throws IOException - { - CellName name = type.cellSerializer().deserialize(in); - - int b = in.readUnsignedByte(); - return deserializeColumnBody(in, name, b, flag, expireBefore); - } - - Cell deserializeColumnBody(DataInput in, CellName name, int mask, ColumnSerializer.Flag flag, int expireBefore) throws IOException - { - if ((mask & COUNTER_MASK) != 0) - { - long timestampOfLastDelete = in.readLong(); - long ts = in.readLong(); - ByteBuffer value = ByteBufferUtil.readWithLength(in); - return BufferCounterCell.create(name, value, ts, timestampOfLastDelete, flag); - } - else if ((mask & EXPIRATION_MASK) != 0) - { - int ttl = in.readInt(); - int expiration = in.readInt(); - long ts = in.readLong(); - ByteBuffer value = ByteBufferUtil.readWithLength(in); - return BufferExpiringCell.create(name, value, ts, ttl, expiration, expireBefore, flag); - } - else - { - long ts = in.readLong(); - ByteBuffer value = ByteBufferUtil.readWithLength(in); - return (mask & COUNTER_UPDATE_MASK) != 0 - ? new BufferCounterUpdateCell(name, value, ts) - : ((mask & DELETION_MASK) == 0 - ? new BufferCell(name, value, ts) - : new BufferDeletedCell(name, value, ts)); - } - } - - void skipColumnBody(DataInput in, int mask) throws IOException - { - if ((mask & COUNTER_MASK) != 0) - FileUtils.skipBytesFully(in, 16); - else if ((mask & EXPIRATION_MASK) != 0) - FileUtils.skipBytesFully(in, 16); - else - FileUtils.skipBytesFully(in, 8); - - int length = in.readInt(); - FileUtils.skipBytesFully(in, length); - } - - public long serializedSize(Cell cell, TypeSizes typeSizes) - { - return cell.serializedSize(type, typeSizes); - } - - public static class CorruptColumnException extends IOException - { - public CorruptColumnException(String s) - { - super(s); - } - - public static CorruptColumnException create(DataInput in, ByteBuffer name) - { - assert name.remaining() <= 0; - String format = "invalid column name length %d%s"; - String details = ""; - if (in instanceof FileDataInput) - { - FileDataInput fdis = (FileDataInput)in; - long remaining; - try - { - remaining = fdis.bytesRemaining(); - } - catch (IOException e) - { - throw new FSReadError(e, fdis.getPath()); - } - details = String.format(" (%s, %d bytes remaining)", fdis.getPath(), remaining); - } - return new CorruptColumnException(String.format(format, name.remaining(), details)); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Columns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java new file mode 100644 index 0000000..83d39db --- /dev/null +++ b/src/java/org/apache/cassandra/db/Columns.java @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.DataInput; +import java.io.IOException; +import java.util.*; +import java.nio.ByteBuffer; +import java.security.MessageDigest; + +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Iterators; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.marshal.MapType; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.ByteBufferUtil; + +/** + * An immutable and sorted list of (non-PK) columns for a given table. + * <p> + * Note that in practice, it will either store only static columns, or only regular ones. When + * we need both type of columns, we use a {@link PartitionColumns} object. + */ +public class Columns implements Iterable<ColumnDefinition> +{ + public static final Serializer serializer = new Serializer(); + public static final Columns NONE = new Columns(new ColumnDefinition[0], 0); + + public final ColumnDefinition[] columns; + public final int complexIdx; // Index of the first complex column + + private Columns(ColumnDefinition[] columns, int complexIdx) + { + assert complexIdx <= columns.length; + this.columns = columns; + this.complexIdx = complexIdx; + } + + private Columns(ColumnDefinition[] columns) + { + this(columns, findFirstComplexIdx(columns)); + } + + /** + * Creates a {@code Columns} holding only the one column provided. + * + * @param c the column for which to create a {@code Columns} object. + * + * @return the newly created {@code Columns} containing only {@code c}. + */ + public static Columns of(ColumnDefinition c) + { + ColumnDefinition[] columns = new ColumnDefinition[]{ c }; + return new Columns(columns, c.isComplex() ? 0 : 1); + } + + /** + * Returns a new {@code Columns} object holing the same columns than the provided set. + * + * @param param s the set from which to create the new {@code Columns}. + * + * @return the newly created {@code Columns} containing the columns from {@code s}. + */ + public static Columns from(Set<ColumnDefinition> s) + { + ColumnDefinition[] columns = s.toArray(new ColumnDefinition[s.size()]); + Arrays.sort(columns); + return new Columns(columns, findFirstComplexIdx(columns)); + } + + private static int findFirstComplexIdx(ColumnDefinition[] columns) + { + for (int i = 0; i < columns.length; i++) + if (columns[i].isComplex()) + return i; + return columns.length; + } + + /** + * Whether this columns is empty. + * + * @return whether this columns is empty. + */ + public boolean isEmpty() + { + return columns.length == 0; + } + + /** + * The number of simple columns in this object. + * + * @return the number of simple columns in this object. + */ + public int simpleColumnCount() + { + return complexIdx; + } + + /** + * The number of complex columns (non-frozen collections, udts, ...) in this object. + * + * @return the number of complex columns in this object. + */ + public int complexColumnCount() + { + return columns.length - complexIdx; + } + + /** + * The total number of columns in this object. + * + * @return the total number of columns in this object. + */ + public int columnCount() + { + return columns.length; + } + + /** + * Whether this objects contains simple columns. + * + * @return whether this objects contains simple columns. + */ + public boolean hasSimple() + { + return complexIdx > 0; + } + + /** + * Whether this objects contains complex columns. + * + * @return whether this objects contains complex columns. + */ + public boolean hasComplex() + { + return complexIdx < columns.length; + } + + /** + * Returns the ith simple column of this object. + * + * @param i the index for the simple column to fectch. This must + * satisfy {@code 0 <= i < simpleColumnCount()}. + * + * @return the {@code i}th simple column in this object. + */ + public ColumnDefinition getSimple(int i) + { + return columns[i]; + } + + /** + * Returns the ith complex column of this object. + * + * @param i the index for the complex column to fectch. This must + * satisfy {@code 0 <= i < complexColumnCount()}. + * + * @return the {@code i}th complex column in this object. + */ + public ColumnDefinition getComplex(int i) + { + return columns[complexIdx + i]; + } + + /** + * The index of the provided simple column in this object (if it contains + * the provided column). + * + * @param c the simple column for which to return the index of. + * @param from the index to start the search from. + * + * @return the index for simple column {@code c} if it is contains in this + * object (starting from index {@code from}), {@code -1} otherwise. + */ + public int simpleIdx(ColumnDefinition c, int from) + { + assert !c.isComplex(); + for (int i = from; i < complexIdx; i++) + // We know we only use "interned" ColumnIdentifier so == is ok. + if (columns[i].name == c.name) + return i; + return -1; + } + + /** + * The index of the provided complex column in this object (if it contains + * the provided column). + * + * @param c the complex column for which to return the index of. + * @param from the index to start the search from. + * + * @return the index for complex column {@code c} if it is contains in this + * object (starting from index {@code from}), {@code -1} otherwise. + */ + public int complexIdx(ColumnDefinition c, int from) + { + assert c.isComplex(); + for (int i = complexIdx + from; i < columns.length; i++) + // We know we only use "interned" ColumnIdentifier so == is ok. + if (columns[i].name == c.name) + return i - complexIdx; + return -1; + } + + /** + * Whether the provided column is contained by this object. + * + * @param c the column to check presence of. + * + * @return whether {@code c} is contained by this object. + */ + public boolean contains(ColumnDefinition c) + { + return c.isComplex() ? complexIdx(c, 0) >= 0 : simpleIdx(c, 0) >= 0; + } + + /** + * Whether or not there is some counter columns within those columns. + * + * @return whether or not there is some counter columns within those columns. + */ + public boolean hasCounters() + { + for (int i = 0; i < complexIdx; i++) + { + if (columns[i].type.isCounter()) + return true; + } + + for (int i = complexIdx; i < columns.length; i++) + { + // We only support counter in maps because that's all we need for now (and we need it for the sake of thrift super columns of counter) + if (columns[i].type instanceof MapType && (((MapType)columns[i].type).valueComparator().isCounter())) + return true; + } + + return false; + } + + /** + * Returns the result of merging this {@code Columns} object with the + * provided one. + * + * @param other the other {@code Columns} to merge this object with. + * + * @return the result of merging/taking the union of {@code this} and + * {@code other}. The returned object may be one of the operand and that + * operand is a subset of the other operand. + */ + public Columns mergeTo(Columns other) + { + if (this == other || other == NONE) + return this; + if (this == NONE) + return other; + + int i = 0, j = 0; + int size = 0; + while (i < columns.length && j < other.columns.length) + { + ++size; + int cmp = columns[i].compareTo(other.columns[j]); + if (cmp == 0) + { + ++i; + ++j; + } + else if (cmp < 0) + { + ++i; + } + else + { + ++j; + } + } + + // If every element was always counted on both array, we have the same + // arrays for the first min elements + if (i == size && j == size) + { + // We've exited because of either c1 or c2 (or both). The array that + // made us stop is thus a subset of the 2nd one, return that array. + return i == columns.length ? other : this; + } + + size += i == columns.length ? other.columns.length - j : columns.length - i; + ColumnDefinition[] result = new ColumnDefinition[size]; + i = 0; + j = 0; + for (int k = 0; k < size; k++) + { + int cmp = i >= columns.length ? 1 + : (j >= other.columns.length ? -1 : columns[i].compareTo(other.columns[j])); + if (cmp == 0) + { + result[k] = columns[i]; + ++i; + ++j; + } + else if (cmp < 0) + { + result[k] = columns[i++]; + } + else + { + result[k] = other.columns[j++]; + } + } + return new Columns(result, findFirstComplexIdx(result)); + } + + /** + * Whether this object is a subset of the provided other {@code Columns object}. + * + * @param other the othere object to test for inclusion in this object. + * + * @return whether all the columns of {@code other} are contained by this object. + */ + public boolean contains(Columns other) + { + if (other.columns.length > columns.length) + return false; + + int j = 0; + int cmp = 0; + for (ColumnDefinition def : other.columns) + { + while (j < columns.length && (cmp = columns[j].compareTo(def)) < 0) + j++; + + if (j >= columns.length || cmp > 0) + return false; + + // cmp == 0, we've found the definition. Ce can bump j once more since + // we know we won't need to compare that element again + j++; + } + return true; + } + + /** + * Iterator over the simple columns of this object. + * + * @return an iterator over the simple columns of this object. + */ + public Iterator<ColumnDefinition> simpleColumns() + { + return new ColumnIterator(0, complexIdx); + } + + /** + * Iterator over the complex columns of this object. + * + * @return an iterator over the complex columns of this object. + */ + public Iterator<ColumnDefinition> complexColumns() + { + return new ColumnIterator(complexIdx, columns.length); + } + + /** + * Iterator over all the columns of this object. + * + * @return an iterator over all the columns of this object. + */ + public Iterator<ColumnDefinition> iterator() + { + return Iterators.forArray(columns); + } + + /** + * An iterator that returns the columns of this object in "select" order (that + * is in global alphabetical order, where the "normal" iterator returns simple + * columns first and the complex second). + * + * @return an iterator returning columns in alphabetical order. + */ + public Iterator<ColumnDefinition> selectOrderIterator() + { + // In wildcard selection, we want to return all columns in alphabetical order, + // irregarding of whether they are complex or not + return new AbstractIterator<ColumnDefinition>() + { + private int regular; + private int complex = complexIdx; + + protected ColumnDefinition computeNext() + { + if (complex >= columns.length) + return regular >= complexIdx ? endOfData() : columns[regular++]; + if (regular >= complexIdx) + return columns[complex++]; + + return ByteBufferUtil.compareUnsigned(columns[regular].name.bytes, columns[complex].name.bytes) < 0 + ? columns[regular++] + : columns[complex++]; + } + }; + } + + /** + * Returns the equivalent of those columns but with the provided column removed. + * + * @param column the column to remove. + * + * @return newly allocated columns containing all the columns of {@code this} expect + * for {@code column}. + */ + public Columns without(ColumnDefinition column) + { + int idx = column.isComplex() ? complexIdx(column, 0) : simpleIdx(column, 0); + if (idx < 0) + return this; + + int realIdx = column.isComplex() ? complexIdx + idx : idx; + + ColumnDefinition[] newColumns = new ColumnDefinition[columns.length - 1]; + System.arraycopy(columns, 0, newColumns, 0, realIdx); + System.arraycopy(columns, realIdx + 1, newColumns, realIdx, newColumns.length - realIdx); + return new Columns(newColumns); + } + + public void digest(MessageDigest digest) + { + for (ColumnDefinition c : this) + digest.update(c.name.bytes.duplicate()); + } + + @Override + public boolean equals(Object other) + { + if (!(other instanceof Columns)) + return false; + + Columns that = (Columns)other; + return this.complexIdx == that.complexIdx && Arrays.equals(this.columns, that.columns); + } + + @Override + public int hashCode() + { + return Objects.hash(complexIdx, Arrays.hashCode(columns)); + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + boolean first = true; + for (ColumnDefinition def : this) + { + if (first) first = false; else sb.append(" "); + sb.append(def.name); + } + return sb.toString(); + } + + private class ColumnIterator extends AbstractIterator<ColumnDefinition> + { + private final int to; + private int idx; + + private ColumnIterator(int from, int to) + { + this.idx = from; + this.to = to; + } + + protected ColumnDefinition computeNext() + { + if (idx >= to) + return endOfData(); + return columns[idx++]; + } + } + + public static class Serializer + { + public void serialize(Columns columns, DataOutputPlus out) throws IOException + { + out.writeShort(columns.columnCount()); + for (ColumnDefinition column : columns) + ByteBufferUtil.writeWithShortLength(column.name.bytes, out); + } + + public long serializedSize(Columns columns, TypeSizes sizes) + { + long size = sizes.sizeof((short)columns.columnCount()); + for (ColumnDefinition column : columns) + size += sizes.sizeofWithShortLength(column.name.bytes); + return size; + } + + public Columns deserialize(DataInput in, CFMetaData metadata) throws IOException + { + int length = in.readUnsignedShort(); + ColumnDefinition[] columns = new ColumnDefinition[length]; + for (int i = 0; i < length; i++) + { + ByteBuffer name = ByteBufferUtil.readWithShortLength(in); + ColumnDefinition column = metadata.getColumnDefinition(name); + if (column == null) + { + // If we don't find the definition, it could be we have data for a dropped column, and we shouldn't + // fail deserialization because of that. So we grab a "fake" ColumnDefinition that ensure proper + // deserialization. The column will be ignore later on anyway. + column = metadata.getDroppedColumnDefinition(name); + if (column == null) + throw new RuntimeException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization"); + } + columns[i] = column; + } + return new Columns(columns); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/CompactTables.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CompactTables.java b/src/java/org/apache/cassandra/db/CompactTables.java new file mode 100644 index 0000000..a72e7f2 --- /dev/null +++ b/src/java/org/apache/cassandra/db/CompactTables.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.utils.ByteBufferUtil; + +/** + * Small utility methods pertaining to the encoding of COMPACT STORAGE tables. + * + * COMPACT STORAGE tables exists mainly for the sake of encoding internally thrift tables (as well as + * exposing those tables through CQL). Note that due to these constraints, the internal representation + * of compact tables does *not* correspond exactly to their CQL definition. + * + * The internal layout of such tables is such that it can encode any thrift table. That layout is as follow: + * CREATE TABLE compact ( + * key [key_validation_class], + * [column_metadata_1] [type1] static, + * ..., + * [column_metadata_n] [type1] static, + * column [comparator], + * value [default_validation_class] + * PRIMARY KEY (key, column) + * ) + * More specifically, the table: + * - always has a clustering column and a regular value, which are used to store the "dynamic" thrift columns name and value. + * Those are always present because we have no way to know in advance if "dynamic" columns will be inserted or not. Note + * that when declared from CQL, compact tables may not have any clustering: in that case, we still have a clustering + * defined internally, it is just ignored as far as interacting from CQL is concerned. + * - have a static column for every "static" column defined in the thrift "column_metadata". Note that when declaring a compact + * table from CQL without any clustering (but some non-PK columns), the columns ends up static internally even though they are + * not in the declaration + * + * On variation is that if the table comparator is a CompositeType, then the underlying table will have one clustering column by + * element of the CompositeType, but the rest of the layout is as above. + * + * As far as thrift is concerned, one exception to this is super column families, which have a different layout. Namely, a super + * column families is encoded with: + * CREATE TABLE super ( + * key [key_validation_class], + * super_column_name [comparator], + * [column_metadata_1] [type1], + * ..., + * [column_metadata_n] [type1], + * "" map<[sub_comparator], [default_validation_class]> + * PRIMARY KEY (key, super_column_name) + * ) + * In other words, every super column is encoded by a row. That row has one column for each defined "column_metadata", but it also + * has a special map column (whose name is the empty string as this is guaranteed to never conflict with a user-defined + * "column_metadata") which stores the super column "dynamic" sub-columns. + */ +public abstract class CompactTables +{ + // We use an empty value for the 1) this can't conflict with a user-defined column and 2) this actually + // validate with any comparator which makes it convenient for columnDefinitionComparator(). + public static final ByteBuffer SUPER_COLUMN_MAP_COLUMN = ByteBufferUtil.EMPTY_BYTE_BUFFER; + public static final String SUPER_COLUMN_MAP_COLUMN_STR = UTF8Type.instance.compose(SUPER_COLUMN_MAP_COLUMN); + + private CompactTables() {} + + public static ColumnDefinition getCompactValueColumn(PartitionColumns columns, boolean isSuper) + { + if (isSuper) + { + for (ColumnDefinition column : columns.regulars) + if (column.name.bytes.equals(SUPER_COLUMN_MAP_COLUMN)) + return column; + throw new AssertionError("Invalid super column table definition, no 'dynamic' map column"); + } + assert columns.regulars.simpleColumnCount() == 1 && columns.regulars.complexColumnCount() == 0; + return columns.regulars.getSimple(0); + } + + public static AbstractType<?> columnDefinitionComparator(ColumnDefinition.Kind kind, boolean isSuper, AbstractType<?> rawComparator, AbstractType<?> rawSubComparator) + { + if (isSuper) + return kind == ColumnDefinition.Kind.REGULAR ? rawSubComparator : UTF8Type.instance; + else + return kind == ColumnDefinition.Kind.STATIC ? rawComparator : UTF8Type.instance; + } + + public static boolean hasEmptyCompactValue(CFMetaData metadata) + { + return metadata.compactValueColumn().type instanceof EmptyType; + } + + public static boolean isSuperColumnMapColumn(ColumnDefinition column) + { + return column.kind == ColumnDefinition.Kind.REGULAR && column.name.bytes.equals(SUPER_COLUMN_MAP_COLUMN); + } + + public static DefaultNames defaultNameGenerator(Set<String> usedNames) + { + return new DefaultNames(new HashSet<String>(usedNames)); + } + + public static DefaultNames defaultNameGenerator(Iterable<ColumnDefinition> defs) + { + Set<String> usedNames = new HashSet<>(); + for (ColumnDefinition def : defs) + usedNames.add(def.name.toString()); + return new DefaultNames(usedNames); + } + + public static class DefaultNames + { + private static final String DEFAULT_PARTITION_KEY_NAME = "key"; + private static final String DEFAULT_CLUSTERING_NAME = "column"; + private static final String DEFAULT_COMPACT_VALUE_NAME = "value"; + + private final Set<String> usedNames; + private int partitionIndex = 0; + private int clusteringIndex = 1; + private int compactIndex = 0; + + private DefaultNames(Set<String> usedNames) + { + this.usedNames = usedNames; + } + + public String defaultPartitionKeyName() + { + while (true) + { + // For compatibility sake, we call the first alias 'key' rather than 'key1'. This + // is inconsistent with column alias, but it's probably not worth risking breaking compatibility now. + String candidate = partitionIndex == 0 ? DEFAULT_PARTITION_KEY_NAME : DEFAULT_PARTITION_KEY_NAME + (partitionIndex + 1); + ++partitionIndex; + if (usedNames.add(candidate)) + return candidate; + } + } + + public String defaultClusteringName() + { + while (true) + { + String candidate = DEFAULT_CLUSTERING_NAME + clusteringIndex; + ++clusteringIndex; + if (usedNames.add(candidate)) + return candidate; + } + } + + public String defaultCompactValueName() + { + while (true) + { + String candidate = compactIndex == 0 ? DEFAULT_COMPACT_VALUE_NAME : DEFAULT_COMPACT_VALUE_NAME + compactIndex; + ++compactIndex; + if (usedNames.add(candidate)) + return candidate; + } + } + } +}
