http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ClusteringPrefix.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java new file mode 100644 index 0000000..73cedb8 --- /dev/null +++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.util.*; + +import org.apache.cassandra.cache.IMeasurableMemory; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.ByteBufferUtil; + +/** + * A clustering prefix is basically the unit of what a {@link ClusteringComparator} can compare. + * <p> + * It holds values for the clustering columns of a table (potentially only a prefix of all of them) and it has + * a "kind" that allows us to implement slices with inclusive and exclusive bounds. + * <p> + * In practice, {@code ClusteringPrefix} is just the common parts to its 2 main subtype: {@link Clustering} and + * {@link Slice.Bound}, where: + * 1) {@code Clustering} represents the clustering values for a row, i.e. the values for it's clustering columns. + * 2) {@code Slice.Bound} represents a bound (start or end) of a slice (of rows). + * See those classes for more details. + */ +public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurableMemory, Clusterable +{ + public static final Serializer serializer = new Serializer(); + + /** + * The kind of clustering prefix this actually is. + * + * The kind {@code STATIC_CLUSTERING} is only implemented by {@link Clustering.STATIC_CLUSTERING} and {@code CLUSTERING} is + * implemented by the {@link Clustering} class. The rest is used by {@link Slice.Bound} and {@link RangeTombstone.Bound}. + */ + public enum Kind + { + // WARNING: the ordering of that enum matters because we use ordinal() in the serialization + + EXCL_END_BOUND(0, -1), + INCL_START_BOUND(1, -1), + EXCL_END_INCL_START_BOUNDARY(1, -1), + STATIC_CLUSTERING(2, -1), + CLUSTERING(3, 0), + INCL_END_EXCL_START_BOUNDARY(4, -1), + INCL_END_BOUND(4, 1), + EXCL_START_BOUND(5, 1); + + private final int comparison; + + // If clusterable c1 has this Kind and is a strict prefix of clusterable c2, then this + // is the result of compare(c1, c2). Basically, this is the same as comparing the kind of c1 to + // CLUSTERING. + public final int prefixComparisonResult; + + private Kind(int comparison, int prefixComparisonResult) + { + this.comparison = comparison; + this.prefixComparisonResult = prefixComparisonResult; + } + + /** + * Compares the 2 provided kind. + * <p> + * Note: this should be used instead of {@link #compareTo} when comparing clustering prefixes. We do + * not override that latter method because it is final for an enum. + */ + public static int compare(Kind k1, Kind k2) + { + return Integer.compare(k1.comparison, k2.comparison); + } + + /** + * Returns the inverse of the current kind. + * <p> + * This invert both start into end (and vice-versa) and inclusive into exclusive (and vice-versa). + * + * @return the invert of this kind. For instance, if this kind is an exlusive start, this return + * an inclusive end. + */ + public Kind invert() + { + switch (this) + { + case EXCL_START_BOUND: return INCL_END_BOUND; + case INCL_START_BOUND: return EXCL_END_BOUND; + case EXCL_END_BOUND: return INCL_START_BOUND; + case INCL_END_BOUND: return EXCL_START_BOUND; + case EXCL_END_INCL_START_BOUNDARY: return INCL_END_EXCL_START_BOUNDARY; + case INCL_END_EXCL_START_BOUNDARY: return EXCL_END_INCL_START_BOUNDARY; + default: return this; + } + } + + public boolean isBound() + { + switch (this) + { + case INCL_START_BOUND: + case INCL_END_BOUND: + case EXCL_START_BOUND: + case EXCL_END_BOUND: + return true; + } + return false; + } + + public boolean isBoundary() + { + switch (this) + { + case INCL_END_EXCL_START_BOUNDARY: + case EXCL_END_INCL_START_BOUNDARY: + return true; + } + return false; + } + + public boolean isStart() + { + switch (this) + { + case INCL_START_BOUND: + case EXCL_END_INCL_START_BOUNDARY: + case INCL_END_EXCL_START_BOUNDARY: + case EXCL_START_BOUND: + return true; + default: + return false; + } + } + + public boolean isEnd() + { + switch (this) + { + case INCL_END_BOUND: + case EXCL_END_INCL_START_BOUNDARY: + case INCL_END_EXCL_START_BOUNDARY: + case EXCL_END_BOUND: + return true; + default: + return false; + } + } + + public boolean isOpen(boolean reversed) + { + return reversed ? isEnd() : isStart(); + } + + public boolean isClose(boolean reversed) + { + return reversed ? isStart() : isEnd(); + } + + public Kind closeBoundOfBoundary(boolean reversed) + { + assert isBoundary(); + return reversed + ? (this == INCL_END_EXCL_START_BOUNDARY ? EXCL_START_BOUND : INCL_START_BOUND) + : (this == INCL_END_EXCL_START_BOUNDARY ? INCL_END_BOUND : EXCL_END_BOUND); + } + + public Kind openBoundOfBoundary(boolean reversed) + { + assert isBoundary(); + return reversed + ? (this == INCL_END_EXCL_START_BOUNDARY ? INCL_END_BOUND : EXCL_END_BOUND) + : (this == INCL_END_EXCL_START_BOUNDARY ? EXCL_START_BOUND : INCL_START_BOUND); + } + } + + public Kind kind(); + + /** + * The number of values in this prefix. + * + * There can't be more values that the this is a prefix of has of clustering columns. + * + * @return the number of values in this prefix. + */ + public int size(); + + /** + * Retrieves the ith value of this prefix. + * + * @param i the index of the value to retrieve. Must be such that {@code 0 <= i < size()}. + * + * @return the ith value of this prefix. Note that a value can be {@code null}. + */ + public ByteBuffer get(int i); + + public void digest(MessageDigest digest); + + // Used to verify if batches goes over a given size + public int dataSize(); + + public String toString(CFMetaData metadata); + + public void writeTo(Writer writer); + + /** + * The values of this prefix as an array. + * <p> + * Please note that this may or may not require an array creation. So 1) you should *not* + * modify the returned array and 2) it's more efficient to use {@link #size()} and + * {@link #get} unless you actually need an array. + * + * @return the values for this prefix as an array. + */ + public ByteBuffer[] getRawValues(); + + /** + * Interface for writing a clustering prefix. + * <p> + * Each value for the prefix should simply be written in order. + */ + public interface Writer + { + /** + * Write the next value to the writer. + * + * @param value the value to write. + */ + public void writeClusteringValue(ByteBuffer value); + } + + public static class Serializer + { + public void serialize(ClusteringPrefix clustering, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException + { + // We shouldn't serialize static clusterings + assert clustering.kind() != Kind.STATIC_CLUSTERING; + if (clustering.kind() == Kind.CLUSTERING) + { + out.writeByte(clustering.kind().ordinal()); + Clustering.serializer.serialize((Clustering)clustering, out, version, types); + } + else + { + Slice.Bound.serializer.serialize((Slice.Bound)clustering, out, version, types); + } + } + + public ClusteringPrefix deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException + { + Kind kind = Kind.values()[in.readByte()]; + // We shouldn't serialize static clusterings + assert kind != Kind.STATIC_CLUSTERING; + if (kind == Kind.CLUSTERING) + return Clustering.serializer.deserialize(in, version, types); + else + return Slice.Bound.serializer.deserializeValues(in, kind, version, types); + } + + public long serializedSize(ClusteringPrefix clustering, int version, List<AbstractType<?>> types, TypeSizes sizes) + { + // We shouldn't serialize static clusterings + assert clustering.kind() != Kind.STATIC_CLUSTERING; + if (clustering.kind() == Kind.CLUSTERING) + return 1 + Clustering.serializer.serializedSize((Clustering)clustering, version, types, sizes); + else + return Slice.Bound.serializer.serializedSize((Slice.Bound)clustering, version, types, sizes); + } + + void serializeValuesWithoutSize(ClusteringPrefix clustering, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException + { + if (clustering.size() == 0) + return; + + writeHeader(clustering, out); + for (int i = 0; i < clustering.size(); i++) + { + ByteBuffer v = clustering.get(i); + if (v == null || !v.hasRemaining()) + continue; // handled in the header + + types.get(i).writeValue(v, out); + } + } + + long valuesWithoutSizeSerializedSize(ClusteringPrefix clustering, int version, List<AbstractType<?>> types, TypeSizes sizes) + { + if (clustering.size() == 0) + return 0; + + long size = headerBytesCount(clustering.size()); + for (int i = 0; i < clustering.size(); i++) + { + ByteBuffer v = clustering.get(i); + if (v == null || !v.hasRemaining()) + continue; // handled in the header + + size += types.get(i).writtenLength(v, sizes); + } + return size; + } + + void deserializeValuesWithoutSize(DataInput in, int size, int version, List<AbstractType<?>> types, ClusteringPrefix.Writer writer) throws IOException + { + if (size == 0) + return; + + int[] header = readHeader(size, in); + for (int i = 0; i < size; i++) + { + if (isNull(header, i)) + writer.writeClusteringValue(null); + else if (isEmpty(header, i)) + writer.writeClusteringValue(ByteBufferUtil.EMPTY_BYTE_BUFFER); + else + writer.writeClusteringValue(types.get(i).readValue(in)); + } + } + + private int headerBytesCount(int size) + { + // For each component, we store 2 bit to know if the component is empty or null (or neither). + // We thus handle 4 component per byte + return size / 4 + (size % 4 == 0 ? 0 : 1); + } + + /** + * Whatever the type of a given clustering column is, its value can always be either empty or null. So we at least need to distinguish those + * 2 values, and because we want to be able to store fixed width values without appending their (fixed) size first, we need a way to encode + * empty values too. So for that, every clustering prefix includes a "header" that contains 2 bits per element in the prefix. For each element, + * those 2 bits encode whether the element is null, empty, or none of those. + */ + private void writeHeader(ClusteringPrefix clustering, DataOutputPlus out) throws IOException + { + int nbBytes = headerBytesCount(clustering.size()); + for (int i = 0; i < nbBytes; i++) + { + int b = 0; + for (int j = 0; j < 4; j++) + { + int c = i * 4 + j; + if (c >= clustering.size()) + break; + + ByteBuffer v = clustering.get(c); + if (v == null) + b |= (1 << (j * 2) + 1); + else if (!v.hasRemaining()) + b |= (1 << (j * 2)); + } + out.writeByte((byte)b); + } + } + + private int[] readHeader(int size, DataInput in) throws IOException + { + int nbBytes = headerBytesCount(size); + int[] header = new int[nbBytes]; + for (int i = 0; i < nbBytes; i++) + header[i] = in.readUnsignedByte(); + return header; + } + + private boolean isNull(int[] header, int i) + { + int b = header[i / 4]; + int mask = 1 << ((i % 4) * 2) + 1; + return (b & mask) != 0; + } + + private boolean isEmpty(int[] header, int i) + { + int b = header[i / 4]; + int mask = 1 << ((i % 4) * 2); + return (b & mask) != 0; + } + } + + /** + * Helper class that makes the deserialization of clustering prefixes faster. + * <p> + * The main reason for this is that when we deserialize rows from sstables, there is many cases where we have + * a bunch of rows to skip at the beginning of an index block because those rows are before the requested slice. + * This class make sure we can answer the question "is the next row on disk before the requested slice" with as + * little work as possible. It does that by providing a comparison method that deserialize only what is needed + * to decide of the comparison. + */ + public static class Deserializer + { + private final ClusteringComparator comparator; + private final DataInput in; + private final SerializationHeader serializationHeader; + + private boolean nextIsRow; + private int[] nextHeader; + + private int nextSize; + private ClusteringPrefix.Kind nextKind; + private int deserializedSize; + private final ByteBuffer[] nextValues; + + public Deserializer(ClusteringComparator comparator, DataInput in, SerializationHeader header) + { + this.comparator = comparator; + this.in = in; + this.serializationHeader = header; + this.nextValues = new ByteBuffer[comparator.size()]; + } + + public void prepare(int flags) throws IOException + { + assert !UnfilteredSerializer.isStatic(flags) : "Flags = " + flags; + this.nextIsRow = UnfilteredSerializer.kind(flags) == Unfiltered.Kind.ROW; + this.nextKind = nextIsRow ? Kind.CLUSTERING : ClusteringPrefix.Kind.values()[in.readByte()]; + this.nextSize = nextIsRow ? comparator.size() : in.readUnsignedShort(); + this.nextHeader = serializer.readHeader(nextSize, in); + this.deserializedSize = 0; + } + + public int compareNextTo(Slice.Bound bound) throws IOException + { + if (bound == Slice.Bound.TOP) + return -1; + + for (int i = 0; i < bound.size(); i++) + { + if (!hasComponent(i)) + return nextKind.prefixComparisonResult; + + int cmp = comparator.compareComponent(i, nextValues[i], bound.get(i)); + if (cmp != 0) + return cmp; + } + + if (bound.size() == nextSize) + return nextKind.compareTo(bound.kind()); + + // We know that we'll have exited already if nextSize < bound.size + return -bound.kind().prefixComparisonResult; + } + + private boolean hasComponent(int i) throws IOException + { + if (i >= nextSize) + return false; + + while (deserializedSize <= i) + deserializeOne(); + + return true; + } + + private boolean deserializeOne() throws IOException + { + if (deserializedSize == nextSize) + return false; + + int i = deserializedSize++; + nextValues[i] = serializer.isNull(nextHeader, i) + ? null + : (serializer.isEmpty(nextHeader, i) ? ByteBufferUtil.EMPTY_BYTE_BUFFER : serializationHeader.clusteringTypes().get(i).readValue(in)); + return true; + } + + private void deserializeAll() throws IOException + { + while (deserializeOne()) + continue; + } + + public RangeTombstone.Bound.Kind deserializeNextBound(RangeTombstone.Bound.Writer writer) throws IOException + { + assert !nextIsRow; + deserializeAll(); + for (int i = 0; i < nextSize; i++) + writer.writeClusteringValue(nextValues[i]); + writer.writeBoundKind(nextKind); + return nextKind; + } + + public void deserializeNextClustering(Clustering.Writer writer) throws IOException + { + assert nextIsRow && nextSize == nextValues.length; + deserializeAll(); + for (int i = 0; i < nextSize; i++) + writer.writeClusteringValue(nextValues[i]); + } + + public ClusteringPrefix.Kind skipNext() throws IOException + { + for (int i = deserializedSize; i < nextSize; i++) + if (!serializer.isNull(nextHeader, i) && !serializer.isEmpty(nextHeader, i)) + serializationHeader.clusteringTypes().get(i).skipValue(in); + return nextKind; + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/CollationController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java deleted file mode 100644 index 7f6d439..0000000 --- a/src/java/org/apache/cassandra/db/CollationController.java +++ /dev/null @@ -1,334 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.Closeable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.TreeSet; - -import com.google.common.base.Function; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; - -import org.apache.cassandra.concurrent.Stage; -import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.filter.NamesQueryFilter; -import org.apache.cassandra.db.filter.QueryFilter; -import org.apache.cassandra.db.marshal.CounterColumnType; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.SearchIterator; -import org.apache.cassandra.utils.memory.HeapAllocator; - -public class CollationController -{ - private final ColumnFamilyStore cfs; - private final QueryFilter filter; - private final int gcBefore; - - private int sstablesIterated = 0; - - public CollationController(ColumnFamilyStore cfs, QueryFilter filter, int gcBefore) - { - this.cfs = cfs; - this.filter = filter; - this.gcBefore = gcBefore; - } - - public ColumnFamily getTopLevelColumns(boolean copyOnHeap) - { - return filter.filter instanceof NamesQueryFilter - && cfs.metadata.getDefaultValidator() != CounterColumnType.instance - ? collectTimeOrderedData(copyOnHeap) - : collectAllData(copyOnHeap); - } - - /** - * Collects data in order of recency, using the sstable maxtimestamp data. - * Once we have data for all requests columns that is newer than the newest remaining maxtimestamp, - * we stop. - */ - private ColumnFamily collectTimeOrderedData(boolean copyOnHeap) - { - final ColumnFamily container = ArrayBackedSortedColumns.factory.create(cfs.metadata, filter.filter.isReversed()); - List<OnDiskAtomIterator> iterators = new ArrayList<>(); - boolean isEmpty = true; - Tracing.trace("Acquiring sstable references"); - ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(filter.key)); - DeletionInfo returnDeletionInfo = container.deletionInfo(); - - try - { - Tracing.trace("Merging memtable contents"); - for (Memtable memtable : view.memtables) - { - ColumnFamily cf = memtable.getColumnFamily(filter.key); - if (cf != null) - { - filter.delete(container.deletionInfo(), cf); - isEmpty = false; - Iterator<Cell> iter = filter.getIterator(cf); - while (iter.hasNext()) - { - Cell cell = iter.next(); - if (copyOnHeap) - cell = cell.localCopy(cfs.metadata, HeapAllocator.instance); - container.addColumn(cell); - } - } - } - - // avoid changing the filter columns of the original filter - // (reduceNameFilter removes columns that are known to be irrelevant) - NamesQueryFilter namesFilter = (NamesQueryFilter) filter.filter; - TreeSet<CellName> filterColumns = new TreeSet<>(namesFilter.columns); - QueryFilter reducedFilter = new QueryFilter(filter.key, filter.cfName, namesFilter.withUpdatedColumns(filterColumns), filter.timestamp); - - /* add the SSTables on disk */ - Collections.sort(view.sstables, SSTableReader.maxTimestampComparator); - - // read sorted sstables - for (SSTableReader sstable : view.sstables) - { - // if we've already seen a row tombstone with a timestamp greater - // than the most recent update to this sstable, we're done, since the rest of the sstables - // will also be older - if (sstable.getMaxTimestamp() < returnDeletionInfo.getTopLevelDeletion().markedForDeleteAt) - break; - - long currentMaxTs = sstable.getMaxTimestamp(); - reduceNameFilter(reducedFilter, container, currentMaxTs); - if (((NamesQueryFilter) reducedFilter.filter).columns.isEmpty()) - break; - - Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation); - sstable.incrementReadCount(); - OnDiskAtomIterator iter = reducedFilter.getSSTableColumnIterator(sstable); - iterators.add(iter); - isEmpty = false; - if (iter.getColumnFamily() != null) - { - container.delete(iter.getColumnFamily()); - sstablesIterated++; - while (iter.hasNext()) - container.addAtom(iter.next()); - } - } - - // we need to distinguish between "there is no data at all for this row" (BF will let us rebuild that efficiently) - // and "there used to be data, but it's gone now" (we should cache the empty CF so we don't need to rebuild that slower) - if (isEmpty) - return null; - - // do a final collate. toCollate is boilerplate required to provide a CloseableIterator - ColumnFamily returnCF = container.cloneMeShallow(); - Tracing.trace("Collating all results"); - filter.collateOnDiskAtom(returnCF, container.iterator(), gcBefore); - - // "hoist up" the requested data into a more recent sstable - if (sstablesIterated > cfs.getMinimumCompactionThreshold() - && !cfs.isAutoCompactionDisabled() - && cfs.getCompactionStrategyManager().shouldDefragment()) - { - // !!WARNING!! if we stop copying our data to a heap-managed object, - // we will need to track the lifetime of this mutation as well - Tracing.trace("Defragmenting requested data"); - final Mutation mutation = new Mutation(cfs.keyspace.getName(), filter.key.getKey(), returnCF.cloneMe()); - StageManager.getStage(Stage.MUTATION).execute(new Runnable() - { - public void run() - { - // skipping commitlog and index updates is fine since we're just de-fragmenting existing data - Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false); - } - }); - } - - // Caller is responsible for final removeDeletedCF. This is important for cacheRow to work correctly: - return returnCF; - } - finally - { - for (OnDiskAtomIterator iter : iterators) - FileUtils.closeQuietly(iter); - } - } - - /** - * remove columns from @param filter where we already have data in @param container newer than @param sstableTimestamp - */ - private void reduceNameFilter(QueryFilter filter, ColumnFamily container, long sstableTimestamp) - { - if (container == null) - return; - - SearchIterator<CellName, Cell> searchIter = container.searchIterator(); - for (Iterator<CellName> iterator = ((NamesQueryFilter) filter.filter).columns.iterator(); iterator.hasNext() && searchIter.hasNext(); ) - { - CellName filterColumn = iterator.next(); - Cell cell = searchIter.next(filterColumn); - if (cell != null && cell.timestamp() > sstableTimestamp) - iterator.remove(); - } - } - - /** - * Collects data the brute-force way: gets an iterator for the filter in question - * from every memtable and sstable, then merges them together. - */ - private ColumnFamily collectAllData(boolean copyOnHeap) - { - Tracing.trace("Acquiring sstable references"); - ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(filter.key)); - List<Iterator<? extends OnDiskAtom>> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size()); - ColumnFamily returnCF = ArrayBackedSortedColumns.factory.create(cfs.metadata, filter.filter.isReversed()); - DeletionInfo returnDeletionInfo = returnCF.deletionInfo(); - try - { - Tracing.trace("Merging memtable tombstones"); - for (Memtable memtable : view.memtables) - { - final ColumnFamily cf = memtable.getColumnFamily(filter.key); - if (cf != null) - { - filter.delete(returnDeletionInfo, cf); - Iterator<Cell> iter = filter.getIterator(cf); - if (copyOnHeap) - { - iter = Iterators.transform(iter, new Function<Cell, Cell>() - { - public Cell apply(Cell cell) - { - return cell.localCopy(cf.metadata, HeapAllocator.instance); - } - }); - } - iterators.add(iter); - } - } - - /* - * We can't eliminate full sstables based on the timestamp of what we've already read like - * in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone - * we've read. We still rely on the sstable ordering by maxTimestamp since if - * maxTimestamp_s1 > maxTimestamp_s0, - * we're guaranteed that s1 cannot have a row tombstone such that - * timestamp(tombstone) > maxTimestamp_s0 - * since we necessarily have - * timestamp(tombstone) <= maxTimestamp_s1 - * In other words, iterating in maxTimestamp order allow to do our mostRecentTombstone elimination - * in one pass, and minimize the number of sstables for which we read a rowTombstone. - */ - Collections.sort(view.sstables, SSTableReader.maxTimestampComparator); - List<SSTableReader> skippedSSTables = null; - long minTimestamp = Long.MAX_VALUE; - int nonIntersectingSSTables = 0; - - for (SSTableReader sstable : view.sstables) - { - minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp()); - // if we've already seen a row tombstone with a timestamp greater - // than the most recent update to this sstable, we can skip it - if (sstable.getMaxTimestamp() < returnDeletionInfo.getTopLevelDeletion().markedForDeleteAt) - break; - - if (!filter.shouldInclude(sstable)) - { - nonIntersectingSSTables++; - // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely - if (sstable.getSSTableMetadata().maxLocalDeletionTime != Integer.MAX_VALUE) - { - if (skippedSSTables == null) - skippedSSTables = new ArrayList<>(); - skippedSSTables.add(sstable); - } - continue; - } - - sstable.incrementReadCount(); - OnDiskAtomIterator iter = filter.getSSTableColumnIterator(sstable); - iterators.add(iter); - if (iter.getColumnFamily() != null) - { - ColumnFamily cf = iter.getColumnFamily(); - returnCF.delete(cf); - sstablesIterated++; - } - } - - int includedDueToTombstones = 0; - // Check for row tombstone in the skipped sstables - if (skippedSSTables != null) - { - for (SSTableReader sstable : skippedSSTables) - { - if (sstable.getMaxTimestamp() <= minTimestamp) - continue; - - sstable.incrementReadCount(); - OnDiskAtomIterator iter = filter.getSSTableColumnIterator(sstable); - ColumnFamily cf = iter.getColumnFamily(); - // we are only interested in row-level tombstones here, and only if markedForDeleteAt is larger than minTimestamp - if (cf != null && cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt > minTimestamp) - { - includedDueToTombstones++; - iterators.add(iter); - returnCF.delete(cf.deletionInfo().getTopLevelDeletion()); - sstablesIterated++; - } - else - { - FileUtils.closeQuietly(iter); - } - } - } - - if (Tracing.isTracing()) - Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones", - nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones); - - // we need to distinguish between "there is no data at all for this row" (BF will let us rebuild that efficiently) - // and "there used to be data, but it's gone now" (we should cache the empty CF so we don't need to rebuild that slower) - if (iterators.isEmpty()) - return null; - - Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated); - filter.collateOnDiskAtom(returnCF, iterators, gcBefore); - - // Caller is responsible for final removeDeletedCF. This is important for cacheRow to work correctly: - return returnCF; - } - finally - { - for (Object iter : iterators) - if (iter instanceof Closeable) - FileUtils.closeQuietly((Closeable) iter); - } - } - - public int getSstablesIterated() - { - return sstablesIterated; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ColumnFamily.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java deleted file mode 100644 index a7243a2..0000000 --- a/src/java/org/apache/cassandra/db/ColumnFamily.java +++ /dev/null @@ -1,565 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.DataInputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.security.MessageDigest; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import com.google.common.collect.ImmutableMap; -import org.apache.commons.lang3.builder.HashCodeBuilder; - -import org.apache.cassandra.cache.IRowCacheEntry; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.db.composites.CellNames; -import org.apache.cassandra.db.filter.ColumnCounter; -import org.apache.cassandra.db.filter.ColumnSlice; -import org.apache.cassandra.io.sstable.ColumnNameHelper; -import org.apache.cassandra.io.sstable.ColumnStats; -import org.apache.cassandra.io.sstable.SSTable; -import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.utils.*; - -/** - * A sorted map of columns. - * This represents the backing map of a colum family. - * - * Whether the implementation is thread safe or not is left to the - * implementing classes. - */ -public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry -{ - /* The column serializer for this Column Family. Create based on config. */ - public static final ColumnFamilySerializer serializer = new ColumnFamilySerializer(); - - protected final CFMetaData metadata; - - protected ColumnFamily(CFMetaData metadata) - { - assert metadata != null; - this.metadata = metadata; - } - - public <T extends ColumnFamily> T cloneMeShallow(ColumnFamily.Factory<T> factory, boolean reversedInsertOrder) - { - T cf = factory.create(metadata, reversedInsertOrder); - cf.delete(this); - return cf; - } - - public ColumnFamily cloneMeShallow() - { - return cloneMeShallow(false); - } - - public ColumnFamily cloneMeShallow(boolean reversed) - { - return cloneMeShallow(getFactory(), reversed); - } - - public ColumnFamilyType getType() - { - return metadata.cfType; - } - - public int liveCQL3RowCount(long now) - { - ColumnCounter counter = getComparator().isDense() - ? new ColumnCounter(now) - : new ColumnCounter.GroupByPrefix(now, getComparator(), metadata.clusteringColumns().size()); - return counter.countAll(this).live(); - } - - /** - * Clones the column map. - */ - public abstract ColumnFamily cloneMe(); - - public UUID id() - { - return metadata.cfId; - } - - /** - * @return The CFMetaData for this row - */ - public CFMetaData metadata() - { - return metadata; - } - - public void addColumn(CellName name, ByteBuffer value, long timestamp) - { - addColumn(name, value, timestamp, 0); - } - - public void addColumn(CellName name, ByteBuffer value, long timestamp, int timeToLive) - { - assert !metadata().isCounter(); - Cell cell = AbstractCell.create(name, value, timestamp, timeToLive, metadata()); - addColumn(cell); - } - - public void addCounter(CellName name, long value) - { - addColumn(new BufferCounterUpdateCell(name, value, FBUtilities.timestampMicros())); - } - - public void addTombstone(CellName name, ByteBuffer localDeletionTime, long timestamp) - { - addColumn(new BufferDeletedCell(name, localDeletionTime, timestamp)); - } - - public void addTombstone(CellName name, int localDeletionTime, long timestamp) - { - addColumn(new BufferDeletedCell(name, localDeletionTime, timestamp)); - } - - public void addAtom(OnDiskAtom atom) - { - if (atom instanceof Cell) - { - addColumn((Cell)atom); - } - else - { - assert atom instanceof RangeTombstone; - delete((RangeTombstone)atom); - } - } - - /** - * Clear this column family, removing all columns and deletion info. - */ - public abstract void clear(); - - /** - * Returns a {@link DeletionInfo.InOrderTester} for the deletionInfo() of - * this column family. Please note that for ThreadSafe implementation of ColumnFamily, - * this tester will remain valid even if new tombstones are added to this ColumnFamily - * *as long as said addition is done in comparator order*. For AtomicSortedColumns, - * the tester will correspond to the state of when this method is called. - */ - public DeletionInfo.InOrderTester inOrderDeletionTester() - { - return deletionInfo().inOrderTester(); - } - - /** - * Returns the factory used for this ISortedColumns implementation. - */ - public abstract Factory getFactory(); - - public abstract DeletionInfo deletionInfo(); - public abstract void setDeletionInfo(DeletionInfo info); - - public abstract void delete(DeletionInfo info); - public abstract void delete(DeletionTime deletionTime); - protected abstract void delete(RangeTombstone tombstone); - - public abstract SearchIterator<CellName, Cell> searchIterator(); - - /** - * Purges top-level and range tombstones whose localDeletionTime is older than gcBefore. - * @param gcBefore a timestamp (in seconds) before which tombstones should be purged - */ - public abstract void purgeTombstones(int gcBefore); - - /** - * Adds a cell to this cell map. - * If a cell with the same name is already present in the map, it will - * be replaced by the newly added cell. - */ - public abstract void addColumn(Cell cell); - - /** - * Adds a cell if it's non-gc-able and isn't shadowed by a partition/range tombstone with a higher timestamp. - * Requires that the cell to add is sorted strictly after the last cell in the container. - */ - public abstract void maybeAppendColumn(Cell cell, DeletionInfo.InOrderTester tester, int gcBefore); - - /** - * Appends a cell. Requires that the cell to add is sorted strictly after the last cell in the container. - */ - public abstract void appendColumn(Cell cell); - - /** - * Adds all the columns of a given column map to this column map. - * This is equivalent to: - * <code> - * for (Cell c : cm) - * addColumn(c, ...); - * </code> - * but is potentially faster. - */ - public abstract void addAll(ColumnFamily cm); - - /** - * Get a column given its name, returning null if the column is not - * present. - */ - public abstract Cell getColumn(CellName name); - - /** - * Returns an iterable with the names of columns in this column map in the same order - * as the underlying columns themselves. - */ - public abstract Iterable<CellName> getColumnNames(); - - /** - * Returns the columns of this column map as a collection. - * The columns in the returned collection should be sorted as the columns - * in this map. - */ - public abstract Collection<Cell> getSortedColumns(); - - /** - * Returns the columns of this column map as a collection. - * The columns in the returned collection should be sorted in reverse - * order of the columns in this map. - */ - public abstract Collection<Cell> getReverseSortedColumns(); - - /** - * Returns the number of columns in this map. - */ - public abstract int getColumnCount(); - - /** - * Returns whether or not there are any columns present. - */ - public abstract boolean hasColumns(); - - /** - * Returns true if this contains no columns or deletion info - */ - public boolean isEmpty() - { - return deletionInfo().isLive() && !hasColumns(); - } - - /** - * Returns an iterator over the columns of this map that returns only the matching @param slices. - * The provided slices must be in order and must be non-overlapping. - */ - public abstract Iterator<Cell> iterator(ColumnSlice[] slices); - - /** - * Returns a reversed iterator over the columns of this map that returns only the matching @param slices. - * The provided slices must be in reversed order and must be non-overlapping. - */ - public abstract Iterator<Cell> reverseIterator(ColumnSlice[] slices); - - /** - * Returns if this map only support inserts in reverse order. - */ - public abstract boolean isInsertReversed(); - - /** - * If `columns` has any tombstones (top-level or range tombstones), they will be applied to this set of columns. - */ - public void delete(ColumnFamily columns) - { - delete(columns.deletionInfo()); - } - - /* - * This function will calculate the difference between 2 column families. - * The external input is assumed to be a superset of internal. - */ - public ColumnFamily diff(ColumnFamily cfComposite) - { - assert cfComposite.id().equals(id()); - ColumnFamily cfDiff = ArrayBackedSortedColumns.factory.create(metadata); - cfDiff.delete(cfComposite.deletionInfo()); - - // (don't need to worry about cfNew containing Columns that are shadowed by - // the delete tombstone, since cfNew was generated by CF.resolve, which - // takes care of those for us.) - for (Cell cellExternal : cfComposite) - { - CellName cName = cellExternal.name(); - Cell cellInternal = getColumn(cName); - if (cellInternal == null) - { - cfDiff.addColumn(cellExternal); - } - else - { - Cell cellDiff = cellInternal.diff(cellExternal); - if (cellDiff != null) - { - cfDiff.addColumn(cellDiff); - } - } - } - - cfDiff.setDeletionInfo(deletionInfo().diff(cfComposite.deletionInfo())); - - if (!cfDiff.isEmpty()) - return cfDiff; - - return null; - } - - public long dataSize() - { - long size = 0; - for (Cell cell : this) - size += cell.cellDataSize(); - return size; - } - - public long maxTimestamp() - { - long maxTimestamp = deletionInfo().maxTimestamp(); - for (Cell cell : this) - maxTimestamp = Math.max(maxTimestamp, cell.timestamp()); - return maxTimestamp; - } - - @Override - public int hashCode() - { - HashCodeBuilder builder = new HashCodeBuilder(373, 75437) - .append(metadata) - .append(deletionInfo()); - for (Cell cell : this) - builder.append(cell); - return builder.toHashCode(); - } - - @Override - public boolean equals(Object o) - { - if (this == o) - return true; - if (o == null || !(o instanceof ColumnFamily)) - return false; - - ColumnFamily comparison = (ColumnFamily) o; - - return metadata.equals(comparison.metadata) - && deletionInfo().equals(comparison.deletionInfo()) - && ByteBufferUtil.compareUnsigned(digest(this), digest(comparison)) == 0; - } - - @Override - public String toString() - { - StringBuilder sb = new StringBuilder("ColumnFamily("); - sb.append(metadata.cfName); - - if (isMarkedForDelete()) - sb.append(" -").append(deletionInfo()).append("-"); - - sb.append(" [").append(CellNames.getColumnsString(getComparator(), this)).append("])"); - return sb.toString(); - } - - public static ByteBuffer digest(ColumnFamily cf) - { - MessageDigest digest = FBUtilities.threadLocalMD5Digest(); - if (cf != null) - cf.updateDigest(digest); - return ByteBuffer.wrap(digest.digest()); - } - - public void updateDigest(MessageDigest digest) - { - for (Cell cell : this) - cell.updateDigest(digest); - - deletionInfo().updateDigest(digest); - } - - public static ColumnFamily diff(ColumnFamily cf1, ColumnFamily cf2) - { - if (cf1 == null) - return cf2; - return cf1.diff(cf2); - } - - public ColumnStats getColumnStats() - { - // note that we default to MIN_VALUE/MAX_VALUE here to be able to override them later in this method - // we are checking row/range tombstones and actual cells - there should always be data that overrides - // these with actual values - ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE); - ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE); - StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE); - ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE); - List<ByteBuffer> minColumnNamesSeen = Collections.emptyList(); - List<ByteBuffer> maxColumnNamesSeen = Collections.emptyList(); - boolean hasLegacyCounterShards = false; - - if (deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE) - { - tombstones.update(deletionInfo().getTopLevelDeletion().localDeletionTime); - maxDeletionTimeTracker.update(deletionInfo().getTopLevelDeletion().localDeletionTime); - minTimestampTracker.update(deletionInfo().getTopLevelDeletion().markedForDeleteAt); - maxTimestampTracker.update(deletionInfo().getTopLevelDeletion().markedForDeleteAt); - } - Iterator<RangeTombstone> it = deletionInfo().rangeIterator(); - while (it.hasNext()) - { - RangeTombstone rangeTombstone = it.next(); - tombstones.update(rangeTombstone.getLocalDeletionTime()); - minTimestampTracker.update(rangeTombstone.timestamp()); - maxTimestampTracker.update(rangeTombstone.timestamp()); - maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime()); - minColumnNamesSeen = ColumnNameHelper.minComponents(minColumnNamesSeen, rangeTombstone.min, metadata.comparator); - maxColumnNamesSeen = ColumnNameHelper.maxComponents(maxColumnNamesSeen, rangeTombstone.max, metadata.comparator); - } - - for (Cell cell : this) - { - minTimestampTracker.update(cell.timestamp()); - maxTimestampTracker.update(cell.timestamp()); - maxDeletionTimeTracker.update(cell.getLocalDeletionTime()); - - int deletionTime = cell.getLocalDeletionTime(); - if (deletionTime < Integer.MAX_VALUE) - tombstones.update(deletionTime); - minColumnNamesSeen = ColumnNameHelper.minComponents(minColumnNamesSeen, cell.name(), metadata.comparator); - maxColumnNamesSeen = ColumnNameHelper.maxComponents(maxColumnNamesSeen, cell.name(), metadata.comparator); - if (cell instanceof CounterCell) - hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) cell).hasLegacyShards(); - } - return new ColumnStats(getColumnCount(), - minTimestampTracker.get(), - maxTimestampTracker.get(), - maxDeletionTimeTracker.get(), - tombstones, - minColumnNamesSeen, - maxColumnNamesSeen, - hasLegacyCounterShards); - } - - public boolean isMarkedForDelete() - { - return !deletionInfo().isLive(); - } - - /** - * @return the comparator whose sorting order the contained columns conform to - */ - public CellNameType getComparator() - { - return metadata.comparator; - } - - public boolean hasOnlyTombstones(long now) - { - for (Cell cell : this) - if (cell.isLive(now)) - return false; - return true; - } - - public Iterator<Cell> iterator() - { - return getSortedColumns().iterator(); - } - - public Iterator<Cell> reverseIterator() - { - return getReverseSortedColumns().iterator(); - } - - public Map<CellName, ByteBuffer> asMap() - { - ImmutableMap.Builder<CellName, ByteBuffer> builder = ImmutableMap.builder(); - for (Cell cell : this) - builder.put(cell.name(), cell.value()); - return builder.build(); - } - - public static ColumnFamily fromBytes(ByteBuffer bytes) - { - if (bytes == null) - return null; - - try - { - return serializer.deserialize(new DataInputStream(ByteBufferUtil.inputStream(bytes)), - ArrayBackedSortedColumns.factory, - ColumnSerializer.Flag.LOCAL, - MessagingService.current_version); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - - public ByteBuffer toBytes() - { - try (DataOutputBuffer out = new DataOutputBuffer()) - { - serializer.serialize(this, out, MessagingService.current_version); - return ByteBuffer.wrap(out.getData(), 0, out.getLength()); - } - } - - - /** - * @return an iterator where the removes are carried out once everything has been iterated - */ - public abstract BatchRemoveIterator<Cell> batchRemoveIterator(); - - public abstract static class Factory <T extends ColumnFamily> - { - /** - * Returns a (initially empty) column map whose columns are sorted - * according to the provided comparator. - * The {@code insertReversed} flag is an hint on how we expect insertion to be perfomed, - * either in sorted or reverse sorted order. This is used by ArrayBackedSortedColumns to - * allow optimizing for both forward and reversed slices. This does not matter for ThreadSafeSortedColumns. - * Note that this is only an hint on how we expect to do insertion, this does not change the map sorting. - */ - public abstract T create(CFMetaData metadata, boolean insertReversed, int initialCapacity); - - public T create(CFMetaData metadata, boolean insertReversed) - { - return create(metadata, insertReversed, 0); - } - - public T create(CFMetaData metadata) - { - return create(metadata, false); - } - - public T create(String keyspace, String cfName) - { - return create(Schema.instance.getCFMetaData(keyspace, cfName)); - } - } - -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java deleted file mode 100644 index 928c21f..0000000 --- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.DataInput; -import java.io.IOException; -import java.util.UUID; - -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.io.ISSTableSerializer; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.sstable.format.Version; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.utils.UUIDSerializer; - -public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily>, ISSTableSerializer<ColumnFamily> -{ - /* - * Serialized ColumnFamily format: - * - * [serialized for intra-node writes only, e.g. returning a query result] - * <cf nullability boolean: false if the cf is null> - * <cf id> - * - * [in sstable only] - * <column bloom filter> - * <sparse column index, start/finish columns every ColumnIndexSizeInKB of data> - * - * [always present] - * <local deletion time> - * <client-provided deletion time> - * <column count> - * <columns, serialized individually> - */ - public void serialize(ColumnFamily cf, DataOutputPlus out, int version) - { - try - { - if (cf == null) - { - out.writeBoolean(false); - return; - } - - out.writeBoolean(true); - serializeCfId(cf.id(), out, version); - cf.getComparator().deletionInfoSerializer().serialize(cf.deletionInfo(), out, version); - ColumnSerializer columnSerializer = cf.getComparator().columnSerializer(); - int count = cf.getColumnCount(); - out.writeInt(count); - int written = 0; - for (Cell cell : cf) - { - columnSerializer.serialize(cell, out); - written++; - } - assert count == written: "Table had " + count + " columns, but " + written + " written"; - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - - public ColumnFamily deserialize(DataInput in, int version) throws IOException - { - return deserialize(in, ColumnSerializer.Flag.LOCAL, version); - } - - public ColumnFamily deserialize(DataInput in, ColumnSerializer.Flag flag, int version) throws IOException - { - return deserialize(in, ArrayBackedSortedColumns.factory, flag, version); - } - - public ColumnFamily deserialize(DataInput in, ColumnFamily.Factory factory, ColumnSerializer.Flag flag, int version) throws IOException - { - if (!in.readBoolean()) - return null; - - ColumnFamily cf = factory.create(Schema.instance.getCFMetaData(deserializeCfId(in, version))); - - if (cf.metadata().isSuper() && version < MessagingService.VERSION_20) - { - SuperColumns.deserializerSuperColumnFamily(in, cf, flag, version); - } - else - { - cf.delete(cf.getComparator().deletionInfoSerializer().deserialize(in, version)); - - ColumnSerializer columnSerializer = cf.getComparator().columnSerializer(); - int size = in.readInt(); - for (int i = 0; i < size; ++i) - cf.addColumn(columnSerializer.deserialize(in, flag)); - } - return cf; - } - - public long contentSerializedSize(ColumnFamily cf, TypeSizes typeSizes, int version) - { - long size = cf.getComparator().deletionInfoSerializer().serializedSize(cf.deletionInfo(), typeSizes, version); - size += typeSizes.sizeof(cf.getColumnCount()); - ColumnSerializer columnSerializer = cf.getComparator().columnSerializer(); - for (Cell cell : cf) - size += columnSerializer.serializedSize(cell, typeSizes); - return size; - } - - public long serializedSize(ColumnFamily cf, TypeSizes typeSizes, int version) - { - if (cf == null) - { - return typeSizes.sizeof(false); - } - else - { - return typeSizes.sizeof(true) /* nullness bool */ - + cfIdSerializedSize(cf.id(), typeSizes, version) /* id */ - + contentSerializedSize(cf, typeSizes, version); - } - } - - public long serializedSize(ColumnFamily cf, int version) - { - return serializedSize(cf, TypeSizes.NATIVE, version); - } - - public void serializeForSSTable(ColumnFamily cf, DataOutputPlus out) - { - // Column families shouldn't be written directly to disk, use ColumnIndex.Builder instead - throw new UnsupportedOperationException(); - } - - public ColumnFamily deserializeFromSSTable(DataInput in, Version version) - { - throw new UnsupportedOperationException(); - } - - public void serializeCfId(UUID cfId, DataOutputPlus out, int version) throws IOException - { - UUIDSerializer.serializer.serialize(cfId, out, version); - } - - public UUID deserializeCfId(DataInput in, int version) throws IOException - { - UUID cfId = UUIDSerializer.serializer.deserialize(in, version); - if (Schema.instance.getCF(cfId) == null) - throw new UnknownColumnFamilyException("Couldn't find cfId=" + cfId, cfId); - - return cfId; - } - - public int cfIdSerializedSize(UUID cfId, TypeSizes typeSizes, int version) - { - return typeSizes.sizeof(cfId); - } -}
