http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java deleted file mode 100644 index 9ef0c14..0000000 --- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java +++ /dev/null @@ -1,578 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.util.AbstractCollection; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -import com.google.common.base.Function; -import com.google.common.base.Functions; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Iterators; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.filter.ColumnSlice; -import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.utils.*; -import org.apache.cassandra.utils.SearchIterator; -import org.apache.cassandra.utils.btree.BTree; -import org.apache.cassandra.utils.btree.BTreeSearchIterator; -import org.apache.cassandra.utils.btree.UpdateFunction; -import org.apache.cassandra.utils.concurrent.Locks; -import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.memory.HeapAllocator; -import org.apache.cassandra.utils.memory.MemtableAllocator; -import org.apache.cassandra.utils.memory.NativePool; - -import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater; - -/** - * A thread-safe and atomic ISortedColumns implementation. - * Operations (in particular addAll) on this implemenation are atomic and - * isolated (in the sense of ACID). Typically a addAll is guaranteed that no - * other thread can see the state where only parts but not all columns have - * been added. - * <p> - * WARNING: removing element through getSortedColumns().iterator() is *not* supported - * </p> - */ -public class AtomicBTreeColumns extends ColumnFamily -{ - static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreeColumns(CFMetaData.denseCFMetaData("keyspace", "table", BytesType.instance), null)) - + ObjectSizes.measure(new Holder(null, null)); - - // Reserved values for wasteTracker field. These values must not be consecutive (see avoidReservedValues) - private static final int TRACKER_NEVER_WASTED = 0; - private static final int TRACKER_PESSIMISTIC_LOCKING = Integer.MAX_VALUE; - - // The granularity with which we track wasted allocation/work; we round up - private static final int ALLOCATION_GRANULARITY_BYTES = 1024; - // The number of bytes we have to waste in excess of our acceptable realtime rate of waste (defined below) - private static final long EXCESS_WASTE_BYTES = 10 * 1024 * 1024L; - private static final int EXCESS_WASTE_OFFSET = (int) (EXCESS_WASTE_BYTES / ALLOCATION_GRANULARITY_BYTES); - // Note this is a shift, because dividing a long time and then picking the low 32 bits doesn't give correct rollover behavior - private static final int CLOCK_SHIFT = 17; - // CLOCK_GRANULARITY = 1^9ns >> CLOCK_SHIFT == 132us == (1/7.63)ms - - /** - * (clock + allocation) granularity are combined to give us an acceptable (waste) allocation rate that is defined by - * the passage of real time of ALLOCATION_GRANULARITY_BYTES/CLOCK_GRANULARITY, or in this case 7.63Kb/ms, or 7.45Mb/s - * - * in wasteTracker we maintain within EXCESS_WASTE_OFFSET before the current time; whenever we waste bytes - * we increment the current value if it is within this window, and set it to the min of the window plus our waste - * otherwise. - */ - private volatile int wasteTracker = TRACKER_NEVER_WASTED; - - private static final AtomicIntegerFieldUpdater<AtomicBTreeColumns> wasteTrackerUpdater = AtomicIntegerFieldUpdater.newUpdater(AtomicBTreeColumns.class, "wasteTracker"); - - private static final Function<Cell, CellName> NAME = new Function<Cell, CellName>() - { - public CellName apply(Cell column) - { - return column.name(); - } - }; - - public static final Factory<AtomicBTreeColumns> factory = new Factory<AtomicBTreeColumns>() - { - public AtomicBTreeColumns create(CFMetaData metadata, boolean insertReversed, int initialCapacity) - { - if (insertReversed) - throw new IllegalArgumentException(); - return new AtomicBTreeColumns(metadata); - } - }; - - private static final DeletionInfo LIVE = DeletionInfo.live(); - // This is a small optimization: DeletionInfo is mutable, but we know that we will always copy it in that class, - // so we can safely alias one DeletionInfo.live() reference and avoid some allocations. - private static final Holder EMPTY = new Holder(BTree.empty(), LIVE); - - private volatile Holder ref; - - private static final AtomicReferenceFieldUpdater<AtomicBTreeColumns, Holder> refUpdater = AtomicReferenceFieldUpdater.newUpdater(AtomicBTreeColumns.class, Holder.class, "ref"); - - private AtomicBTreeColumns(CFMetaData metadata) - { - this(metadata, EMPTY); - } - - private AtomicBTreeColumns(CFMetaData metadata, Holder holder) - { - super(metadata); - this.ref = holder; - } - - public Factory getFactory() - { - return factory; - } - - public ColumnFamily cloneMe() - { - return new AtomicBTreeColumns(metadata, ref); - } - - public DeletionInfo deletionInfo() - { - return ref.deletionInfo; - } - - public void delete(DeletionTime delTime) - { - delete(new DeletionInfo(delTime)); - } - - protected void delete(RangeTombstone tombstone) - { - delete(new DeletionInfo(tombstone, getComparator())); - } - - public SearchIterator<CellName, Cell> searchIterator() - { - return new BTreeSearchIterator<>(ref.tree, asymmetricComparator()); - } - - public void delete(DeletionInfo info) - { - if (info.isLive()) - return; - - // Keeping deletion info for max markedForDeleteAt value - while (true) - { - Holder current = ref; - DeletionInfo curDelInfo = current.deletionInfo; - DeletionInfo newDelInfo = info.mayModify(curDelInfo) ? curDelInfo.copy().add(info) : curDelInfo; - if (refUpdater.compareAndSet(this, current, current.with(newDelInfo))) - break; - } - } - - public void setDeletionInfo(DeletionInfo newInfo) - { - ref = ref.with(newInfo); - } - - public void purgeTombstones(int gcBefore) - { - while (true) - { - Holder current = ref; - if (!current.deletionInfo.hasPurgeableTombstones(gcBefore)) - break; - - DeletionInfo purgedInfo = current.deletionInfo.copy(); - purgedInfo.purge(gcBefore); - if (refUpdater.compareAndSet(this, current, current.with(purgedInfo))) - break; - } - } - - /** - * This is only called by Memtable.resolve, so only AtomicBTreeColumns needs to implement it. - * - * @return the difference in size seen after merging the given columns - */ - public Pair<Long, Long> addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer) - { - ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, allocator, writeOp, indexer); - DeletionInfo inputDeletionInfoCopy = null; - - boolean monitorOwned = false; - try - { - if (usePessimisticLocking()) - { - Locks.monitorEnterUnsafe(this); - monitorOwned = true; - } - while (true) - { - Holder current = ref; - updater.ref = current; - updater.reset(); - - DeletionInfo deletionInfo; - if (cm.deletionInfo().mayModify(current.deletionInfo)) - { - if (inputDeletionInfoCopy == null) - inputDeletionInfoCopy = cm.deletionInfo().copy(HeapAllocator.instance); - - deletionInfo = current.deletionInfo.copy().add(inputDeletionInfoCopy); - updater.allocated(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize()); - } - else - { - deletionInfo = current.deletionInfo; - } - - Object[] tree = BTree.update(current.tree, metadata.comparator.columnComparator(Memtable.MEMORY_POOL instanceof NativePool), cm, cm.getColumnCount(), true, updater); - - if (tree != null && refUpdater.compareAndSet(this, current, new Holder(tree, deletionInfo))) - { - indexer.updateRowLevelIndexes(); - updater.finish(); - return Pair.create(updater.dataSize, updater.colUpdateTimeDelta); - } - else if (!monitorOwned) - { - boolean shouldLock = usePessimisticLocking(); - if (!shouldLock) - { - shouldLock = updateWastedAllocationTracker(updater.heapSize); - } - if (shouldLock) - { - Locks.monitorEnterUnsafe(this); - monitorOwned = true; - } - } - } - } - finally - { - if (monitorOwned) - Locks.monitorExitUnsafe(this); - } - } - - boolean usePessimisticLocking() - { - return wasteTracker == TRACKER_PESSIMISTIC_LOCKING; - } - - /** - * Update the wasted allocation tracker state based on newly wasted allocation information - * - * @param wastedBytes the number of bytes wasted by this thread - * @return true if the caller should now proceed with pessimistic locking because the waste limit has been reached - */ - private boolean updateWastedAllocationTracker(long wastedBytes) { - // Early check for huge allocation that exceeds the limit - if (wastedBytes < EXCESS_WASTE_BYTES) - { - // We round up to ensure work < granularity are still accounted for - int wastedAllocation = ((int) (wastedBytes + ALLOCATION_GRANULARITY_BYTES - 1)) / ALLOCATION_GRANULARITY_BYTES; - - int oldTrackerValue; - while (TRACKER_PESSIMISTIC_LOCKING != (oldTrackerValue = wasteTracker)) - { - // Note this time value has an arbitrary offset, but is a constant rate 32 bit counter (that may wrap) - int time = (int) (System.nanoTime() >>> CLOCK_SHIFT); - int delta = oldTrackerValue - time; - if (oldTrackerValue == TRACKER_NEVER_WASTED || delta >= 0 || delta < -EXCESS_WASTE_OFFSET) - delta = -EXCESS_WASTE_OFFSET; - delta += wastedAllocation; - if (delta >= 0) - break; - if (wasteTrackerUpdater.compareAndSet(this, oldTrackerValue, avoidReservedValues(time + delta))) - return false; - } - } - // We have definitely reached our waste limit so set the state if it isn't already - wasteTrackerUpdater.set(this, TRACKER_PESSIMISTIC_LOCKING); - // And tell the caller to proceed with pessimistic locking - return true; - } - - private static int avoidReservedValues(int wasteTracker) - { - if (wasteTracker == TRACKER_NEVER_WASTED || wasteTracker == TRACKER_PESSIMISTIC_LOCKING) - return wasteTracker + 1; - return wasteTracker; - } - - // no particular reason not to implement these next methods, we just haven't needed them yet - - public void addColumn(Cell column) - { - throw new UnsupportedOperationException(); - } - - public void maybeAppendColumn(Cell cell, DeletionInfo.InOrderTester tester, int gcBefore) - { - throw new UnsupportedOperationException(); - } - - public void appendColumn(Cell cell) - { - throw new UnsupportedOperationException(); - } - - public void addAll(ColumnFamily cf) - { - throw new UnsupportedOperationException(); - } - - public void clear() - { - throw new UnsupportedOperationException(); - } - - public Cell getColumn(CellName name) - { - return (Cell) BTree.find(ref.tree, asymmetricComparator(), name); - } - - private Comparator<Object> asymmetricComparator() - { - return metadata.comparator.asymmetricColumnComparator(Memtable.MEMORY_POOL instanceof NativePool); - } - - public Iterable<CellName> getColumnNames() - { - return collection(false, NAME); - } - - public Collection<Cell> getSortedColumns() - { - return collection(true, Functions.<Cell>identity()); - } - - public Collection<Cell> getReverseSortedColumns() - { - return collection(false, Functions.<Cell>identity()); - } - - private <V> Collection<V> collection(final boolean forwards, final Function<Cell, V> f) - { - final Holder ref = this.ref; - return new AbstractCollection<V>() - { - public Iterator<V> iterator() - { - return Iterators.transform(BTree.<Cell>slice(ref.tree, forwards), f); - } - - public int size() - { - return BTree.slice(ref.tree, true).count(); - } - }; - } - - public int getColumnCount() - { - return BTree.slice(ref.tree, true).count(); - } - - public boolean hasColumns() - { - return !BTree.isEmpty(ref.tree); - } - - public Iterator<Cell> iterator(ColumnSlice[] slices) - { - return slices.length == 1 - ? slice(ref.tree, asymmetricComparator(), slices[0].start, slices[0].finish, true) - : new SliceIterator(ref.tree, asymmetricComparator(), true, slices); - } - - public Iterator<Cell> reverseIterator(ColumnSlice[] slices) - { - return slices.length == 1 - ? slice(ref.tree, asymmetricComparator(), slices[0].finish, slices[0].start, false) - : new SliceIterator(ref.tree, asymmetricComparator(), false, slices); - } - - public boolean isInsertReversed() - { - return false; - } - - public BatchRemoveIterator<Cell> batchRemoveIterator() - { - throw new UnsupportedOperationException(); - } - - private static final class Holder - { - final DeletionInfo deletionInfo; - // the btree of columns - final Object[] tree; - - Holder(Object[] tree, DeletionInfo deletionInfo) - { - this.tree = tree; - this.deletionInfo = deletionInfo; - } - - Holder with(DeletionInfo info) - { - return new Holder(this.tree, info); - } - } - - // the function we provide to the btree utilities to perform any column replacements - private static final class ColumnUpdater implements UpdateFunction<Cell> - { - final AtomicBTreeColumns updating; - final CFMetaData metadata; - final MemtableAllocator allocator; - final OpOrder.Group writeOp; - final Updater indexer; - Holder ref; - long dataSize; - long heapSize; - long colUpdateTimeDelta = Long.MAX_VALUE; - final MemtableAllocator.DataReclaimer reclaimer; - List<Cell> inserted; // TODO: replace with walk of aborted BTree - - private ColumnUpdater(AtomicBTreeColumns updating, CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer) - { - this.updating = updating; - this.allocator = allocator; - this.writeOp = writeOp; - this.indexer = indexer; - this.metadata = metadata; - this.reclaimer = allocator.reclaimer(); - } - - public Cell apply(Cell insert) - { - indexer.insert(insert); - insert = insert.localCopy(metadata, allocator, writeOp); - this.dataSize += insert.cellDataSize(); - this.heapSize += insert.unsharedHeapSizeExcludingData(); - if (inserted == null) - inserted = new ArrayList<>(); - inserted.add(insert); - return insert; - } - - public Cell apply(Cell existing, Cell update) - { - Cell reconciled = existing.reconcile(update); - indexer.update(existing, reconciled); - if (existing != reconciled) - { - reconciled = reconciled.localCopy(metadata, allocator, writeOp); - dataSize += reconciled.cellDataSize() - existing.cellDataSize(); - heapSize += reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData(); - if (inserted == null) - inserted = new ArrayList<>(); - inserted.add(reconciled); - discard(existing); - //Getting the minimum delta for an update containing multiple columns - colUpdateTimeDelta = Math.min(Math.abs(existing.timestamp() - update.timestamp()), colUpdateTimeDelta); - } - return reconciled; - } - - protected void reset() - { - this.dataSize = 0; - this.heapSize = 0; - if (inserted != null) - { - for (Cell cell : inserted) - abort(cell); - inserted.clear(); - } - reclaimer.cancel(); - } - - protected void abort(Cell abort) - { - reclaimer.reclaimImmediately(abort); - } - - protected void discard(Cell discard) - { - reclaimer.reclaim(discard); - } - - public boolean abortEarly() - { - return updating.ref != ref; - } - - public void allocated(long heapSize) - { - this.heapSize += heapSize; - } - - protected void finish() - { - allocator.onHeap().allocate(heapSize, writeOp); - reclaimer.commit(); - } - } - - private static class SliceIterator extends AbstractIterator<Cell> - { - private final Object[] btree; - private final boolean forwards; - private final Comparator<Object> comparator; - private final ColumnSlice[] slices; - - private int idx = 0; - private Iterator<Cell> currentSlice; - - SliceIterator(Object[] btree, Comparator<Object> comparator, boolean forwards, ColumnSlice[] slices) - { - this.btree = btree; - this.comparator = comparator; - this.slices = slices; - this.forwards = forwards; - } - - protected Cell computeNext() - { - while (currentSlice != null || idx < slices.length) - { - if (currentSlice == null) - { - ColumnSlice slice = slices[idx++]; - if (forwards) - currentSlice = slice(btree, comparator, slice.start, slice.finish, true); - else - currentSlice = slice(btree, comparator, slice.finish, slice.start, false); - } - - if (currentSlice.hasNext()) - return currentSlice.next(); - - currentSlice = null; - } - - return endOfData(); - } - } - - private static Iterator<Cell> slice(Object[] btree, Comparator<Object> comparator, Composite start, Composite finish, boolean forwards) - { - return BTree.slice(btree, - comparator, - start.isEmpty() ? null : start, - true, - finish.isEmpty() ? null : finish, - true, - forwards); - } -}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index 6038475..83a0654 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.dht.Token; @@ -138,12 +139,12 @@ public class BatchlogManager implements BatchlogManagerMBean @VisibleForTesting static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version, long now) { - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(SystemKeyspace.Batchlog); - CFRowAdder adder = new CFRowAdder(cf, SystemKeyspace.Batchlog.comparator.builder().build(), now); - adder.add("data", serializeMutations(mutations, version)) - .add("written_at", new Date(now / 1000)) - .add("version", version); - return new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(uuid), cf); + return new RowUpdateBuilder(SystemKeyspace.Batchlog, now, uuid) + .clustering() + .add("data", serializeMutations(mutations, version)) + .add("written_at", new Date(now / 1000)) + .add("version", version) + .build(); } private static ByteBuffer serializeMutations(Collection<Mutation> mutations, int version) @@ -186,7 +187,7 @@ public class BatchlogManager implements BatchlogManagerMBean SystemKeyspace.NAME, SystemKeyspace.BATCHLOG, PAGE_SIZE), - id); + id); } cleanup(); @@ -196,8 +197,8 @@ public class BatchlogManager implements BatchlogManagerMBean private void deleteBatch(UUID id) { - Mutation mutation = new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(id)); - mutation.delete(SystemKeyspace.BATCHLOG, FBUtilities.timestampMicros()); + Mutation mutation = new Mutation(SystemKeyspace.NAME, StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(id))); + mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog, mutation.key(), FBUtilities.timestampMicros(), FBUtilities.nowInSeconds())); mutation.apply(); } @@ -382,7 +383,7 @@ public class BatchlogManager implements BatchlogManagerMBean { Set<InetAddress> liveEndpoints = new HashSet<>(); String ks = mutation.getKeyspaceName(); - Token tk = StorageService.getPartitioner().getToken(mutation.key()); + Token tk = mutation.key().getToken(); for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk), StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks))) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/BufferCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BufferCell.java b/src/java/org/apache/cassandra/db/BufferCell.java deleted file mode 100644 index a7d632d..0000000 --- a/src/java/org/apache/cassandra/db/BufferCell.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.nio.ByteBuffer; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.CellNames; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.ObjectSizes; -import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.memory.AbstractAllocator; -import org.apache.cassandra.utils.memory.MemtableAllocator; - -public class BufferCell extends AbstractCell -{ - private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferCell(CellNames.simpleDense(ByteBuffer.allocate(1)))); - - protected final CellName name; - protected final ByteBuffer value; - protected final long timestamp; - - BufferCell(CellName name) - { - this(name, ByteBufferUtil.EMPTY_BYTE_BUFFER); - } - - public BufferCell(CellName name, ByteBuffer value) - { - this(name, value, 0); - } - - public BufferCell(CellName name, ByteBuffer value, long timestamp) - { - assert name != null; - assert value != null; - - this.name = name; - this.value = value; - this.timestamp = timestamp; - } - - @Override - public Cell withUpdatedName(CellName newName) - { - return new BufferCell(newName, value, timestamp); - } - - @Override - public Cell withUpdatedTimestamp(long newTimestamp) - { - return new BufferCell(name, value, newTimestamp); - } - - @Override - public CellName name() { - return name; - } - - @Override - public ByteBuffer value() { - return value; - } - - @Override - public long timestamp() { - return timestamp; - } - - @Override - public long unsharedHeapSizeExcludingData() - { - return EMPTY_SIZE + name.unsharedHeapSizeExcludingData() + ObjectSizes.sizeOnHeapExcludingData(value); - } - - @Override - public Cell localCopy(CFMetaData metadata, AbstractAllocator allocator) - { - return new BufferCell(name.copy(metadata, allocator), allocator.clone(value), timestamp); - } - - @Override - public Cell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup) - { - return allocator.clone(this, metadata, opGroup); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/BufferCounterCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BufferCounterCell.java b/src/java/org/apache/cassandra/db/BufferCounterCell.java deleted file mode 100644 index 827182a..0000000 --- a/src/java/org/apache/cassandra/db/BufferCounterCell.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.nio.ByteBuffer; -import java.security.MessageDigest; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.memory.AbstractAllocator; -import org.apache.cassandra.utils.memory.MemtableAllocator; - -public class BufferCounterCell extends BufferCell implements CounterCell -{ - private final long timestampOfLastDelete; - - public BufferCounterCell(CellName name, ByteBuffer value, long timestamp) - { - this(name, value, timestamp, Long.MIN_VALUE); - } - - public BufferCounterCell(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete) - { - super(name, value, timestamp); - this.timestampOfLastDelete = timestampOfLastDelete; - } - - public static CounterCell create(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete, ColumnSerializer.Flag flag) - { - if (flag == ColumnSerializer.Flag.FROM_REMOTE || (flag == ColumnSerializer.Flag.LOCAL && contextManager.shouldClearLocal(value))) - value = contextManager.clearAllLocal(value); - return new BufferCounterCell(name, value, timestamp, timestampOfLastDelete); - } - - // For use by tests of compatibility with pre-2.1 counter only. - public static CounterCell createLocal(CellName name, long value, long timestamp, long timestampOfLastDelete) - { - return new BufferCounterCell(name, contextManager.createLocal(value), timestamp, timestampOfLastDelete); - } - - @Override - public Cell withUpdatedName(CellName newName) - { - return new BufferCounterCell(newName, value, timestamp, timestampOfLastDelete); - } - - @Override - public long timestampOfLastDelete() - { - return timestampOfLastDelete; - } - - @Override - public long total() - { - return contextManager.total(value); - } - - @Override - public int cellDataSize() - { - // A counter column adds 8 bytes for timestampOfLastDelete to Cell. - return super.cellDataSize() + TypeSizes.NATIVE.sizeof(timestampOfLastDelete); - } - - @Override - public int serializedSize(CellNameType type, TypeSizes typeSizes) - { - return super.serializedSize(type, typeSizes) + typeSizes.sizeof(timestampOfLastDelete); - } - - @Override - public Cell diff(Cell cell) - { - return diffCounter(cell); - } - - /* - * We have to special case digest creation for counter column because - * we don't want to include the information about which shard of the - * context is a delta or not, since this information differs from node to - * node. - */ - @Override - public void updateDigest(MessageDigest digest) - { - digest.update(name().toByteBuffer().duplicate()); - // We don't take the deltas into account in a digest - contextManager.updateDigest(digest, value()); - - FBUtilities.updateWithLong(digest, timestamp); - FBUtilities.updateWithByte(digest, serializationFlags()); - FBUtilities.updateWithLong(digest, timestampOfLastDelete); - } - - @Override - public Cell reconcile(Cell cell) - { - return reconcileCounter(cell); - } - - @Override - public boolean hasLegacyShards() - { - return contextManager.hasLegacyShards(value); - } - - @Override - public CounterCell localCopy(CFMetaData metadata, AbstractAllocator allocator) - { - return new BufferCounterCell(name.copy(metadata, allocator), allocator.clone(value), timestamp, timestampOfLastDelete); - } - - @Override - public CounterCell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup) - { - return allocator.clone(this, metadata, opGroup); - } - - @Override - public String getString(CellNameType comparator) - { - return String.format("%s:false:%s@%d!%d", - comparator.getString(name()), - contextManager.toString(value()), - timestamp(), - timestampOfLastDelete); - } - - @Override - public int serializationFlags() - { - return ColumnSerializer.COUNTER_MASK; - } - - @Override - public void validateFields(CFMetaData metadata) throws MarshalException - { - validateName(metadata); - // We cannot use the value validator as for other columns as the CounterColumnType validate a long, - // which is not the internal representation of counters - contextManager.validateContext(value()); - } - - @Override - public Cell markLocalToBeCleared() - { - ByteBuffer marked = contextManager.markLocalToBeCleared(value()); - return marked == value() ? this : new BufferCounterCell(name(), marked, timestamp(), timestampOfLastDelete); - } - - @Override - public boolean equals(Cell cell) - { - return super.equals(cell) && timestampOfLastDelete == ((CounterCell) cell).timestampOfLastDelete(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/BufferCounterUpdateCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BufferCounterUpdateCell.java b/src/java/org/apache/cassandra/db/BufferCounterUpdateCell.java deleted file mode 100644 index f7df3ea..0000000 --- a/src/java/org/apache/cassandra/db/BufferCounterUpdateCell.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.nio.ByteBuffer; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.memory.AbstractAllocator; -import org.apache.cassandra.utils.memory.MemtableAllocator; - -public class BufferCounterUpdateCell extends BufferCell implements CounterUpdateCell -{ - public BufferCounterUpdateCell(CellName name, long value, long timestamp) - { - this(name, ByteBufferUtil.bytes(value), timestamp); - } - - public BufferCounterUpdateCell(CellName name, ByteBuffer value, long timestamp) - { - super(name, value, timestamp); - } - - @Override - public Cell withUpdatedName(CellName newName) - { - return new BufferCounterUpdateCell(newName, value, timestamp); - } - - public long delta() - { - return value().getLong(value.position()); - } - - @Override - public Cell diff(Cell cell) - { - // Diff is used during reads, but we should never read those columns - throw new UnsupportedOperationException("This operation is unsupported on CounterUpdateCell."); - } - - @Override - public Cell reconcile(Cell cell) - { - // No matter what the counter cell's timestamp is, a tombstone always takes precedence. See CASSANDRA-7346. - if (cell instanceof DeletedCell) - return cell; - - assert cell instanceof CounterUpdateCell : "Wrong class type."; - - // The only time this could happen is if a batch ships two increments for the same cell. Hence we simply sum the deltas. - return new BufferCounterUpdateCell(name, delta() + ((CounterUpdateCell) cell).delta(), Math.max(timestamp, cell.timestamp())); - } - - @Override - public int serializationFlags() - { - return ColumnSerializer.COUNTER_UPDATE_MASK; - } - - @Override - public Cell localCopy(CFMetaData metadata, AbstractAllocator allocator) - { - throw new UnsupportedOperationException(); - } - - @Override - public Cell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup) - { - throw new UnsupportedOperationException(); - } - - @Override - public String getString(CellNameType comparator) - { - return String.format("%s:%s@%d", comparator.getString(name()), ByteBufferUtil.toLong(value), timestamp()); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/BufferDeletedCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BufferDeletedCell.java b/src/java/org/apache/cassandra/db/BufferDeletedCell.java deleted file mode 100644 index a38f322..0000000 --- a/src/java/org/apache/cassandra/db/BufferDeletedCell.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.nio.ByteBuffer; -import java.security.MessageDigest; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.memory.AbstractAllocator; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.memory.MemtableAllocator; - -public class BufferDeletedCell extends BufferCell implements DeletedCell -{ - public BufferDeletedCell(CellName name, int localDeletionTime, long timestamp) - { - this(name, ByteBufferUtil.bytes(localDeletionTime), timestamp); - } - - public BufferDeletedCell(CellName name, ByteBuffer value, long timestamp) - { - super(name, value, timestamp); - } - - @Override - public Cell withUpdatedName(CellName newName) - { - return new BufferDeletedCell(newName, value, timestamp); - } - - @Override - public Cell withUpdatedTimestamp(long newTimestamp) - { - return new BufferDeletedCell(name, value, newTimestamp); - } - - @Override - public boolean isLive() - { - return false; - } - - @Override - public boolean isLive(long now) - { - return false; - } - - @Override - public int getLocalDeletionTime() - { - return value().getInt(value.position()); - } - - @Override - public Cell reconcile(Cell cell) - { - if (cell instanceof DeletedCell) - return super.reconcile(cell); - return cell.reconcile(this); - } - - @Override - public DeletedCell localCopy(CFMetaData metadata, AbstractAllocator allocator) - { - return new BufferDeletedCell(name.copy(metadata, allocator), allocator.clone(value), timestamp); - } - - @Override - public DeletedCell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup) - { - return allocator.clone(this, metadata, opGroup); - } - - @Override - public int serializationFlags() - { - return ColumnSerializer.DELETION_MASK; - } - - @Override - public void validateFields(CFMetaData metadata) throws MarshalException - { - validateName(metadata); - if (value().remaining() != 4) - throw new MarshalException("A tombstone value should be 4 bytes long"); - if (getLocalDeletionTime() < 0) - throw new MarshalException("The local deletion time should not be negative"); - } - - @Override - public void updateDigest(MessageDigest digest) - { - digest.update(name().toByteBuffer().duplicate()); - - FBUtilities.updateWithLong(digest, timestamp()); - FBUtilities.updateWithByte(digest, serializationFlags()); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/BufferExpiringCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BufferExpiringCell.java b/src/java/org/apache/cassandra/db/BufferExpiringCell.java deleted file mode 100644 index efb56d5..0000000 --- a/src/java/org/apache/cassandra/db/BufferExpiringCell.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.nio.ByteBuffer; -import java.security.MessageDigest; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.memory.AbstractAllocator; -import org.apache.cassandra.utils.memory.MemtableAllocator; - -public class BufferExpiringCell extends BufferCell implements ExpiringCell -{ - private final int localExpirationTime; - private final int timeToLive; - - public BufferExpiringCell(CellName name, ByteBuffer value, long timestamp, int timeToLive) - { - this(name, value, timestamp, timeToLive, (int) (System.currentTimeMillis() / 1000) + timeToLive); - } - - public BufferExpiringCell(CellName name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime) - { - super(name, value, timestamp); - assert timeToLive > 0 : timeToLive; - assert localExpirationTime > 0 : localExpirationTime; - this.timeToLive = timeToLive; - this.localExpirationTime = localExpirationTime; - } - - public int getTimeToLive() - { - return timeToLive; - } - - @Override - public Cell withUpdatedName(CellName newName) - { - return new BufferExpiringCell(newName, value(), timestamp(), timeToLive, localExpirationTime); - } - - @Override - public Cell withUpdatedTimestamp(long newTimestamp) - { - return new BufferExpiringCell(name(), value(), newTimestamp, timeToLive, localExpirationTime); - } - - @Override - public int cellDataSize() - { - return super.cellDataSize() + TypeSizes.NATIVE.sizeof(localExpirationTime) + TypeSizes.NATIVE.sizeof(timeToLive); - } - - @Override - public int serializedSize(CellNameType type, TypeSizes typeSizes) - { - /* - * An expired column adds to a Cell : - * 4 bytes for the localExpirationTime - * + 4 bytes for the timeToLive - */ - return super.serializedSize(type, typeSizes) + typeSizes.sizeof(localExpirationTime) + typeSizes.sizeof(timeToLive); - } - - @Override - public void updateDigest(MessageDigest digest) - { - super.updateDigest(digest); - FBUtilities.updateWithInt(digest, timeToLive); - } - - @Override - public int getLocalDeletionTime() - { - return localExpirationTime; - } - - @Override - public ExpiringCell localCopy(CFMetaData metadata, AbstractAllocator allocator) - { - return new BufferExpiringCell(name.copy(metadata, allocator), allocator.clone(value), timestamp, timeToLive, localExpirationTime); - } - - @Override - public ExpiringCell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup) - { - return allocator.clone(this, metadata, opGroup); - } - - @Override - public String getString(CellNameType comparator) - { - return String.format("%s!%d", super.getString(comparator), timeToLive); - } - - @Override - public boolean isLive() - { - return isLive(System.currentTimeMillis()); - } - - @Override - public boolean isLive(long now) - { - return (int) (now / 1000) < getLocalDeletionTime(); - } - - @Override - public int serializationFlags() - { - return ColumnSerializer.EXPIRATION_MASK; - } - - @Override - public void validateFields(CFMetaData metadata) throws MarshalException - { - super.validateFields(metadata); - - if (timeToLive <= 0) - throw new MarshalException("A column TTL should be > 0, but was " + timeToLive); - if (localExpirationTime < 0) - throw new MarshalException("The local expiration time should not be negative but was " + localExpirationTime); - } - - @Override - public Cell reconcile(Cell cell) - { - long ts1 = timestamp(), ts2 = cell.timestamp(); - if (ts1 != ts2) - return ts1 < ts2 ? cell : this; - // we should prefer tombstones - if (cell instanceof DeletedCell) - return cell; - int c = value().compareTo(cell.value()); - if (c != 0) - return c < 0 ? cell : this; - // If we have same timestamp and value, prefer the longest ttl - if (cell instanceof ExpiringCell) - { - int let1 = localExpirationTime, let2 = cell.getLocalDeletionTime(); - if (let1 < let2) - return cell; - } - return this; - } - - @Override - public boolean equals(Cell cell) - { - if (!super.equals(cell)) - return false; - ExpiringCell that = (ExpiringCell) cell; - return getLocalDeletionTime() == that.getLocalDeletionTime() && getTimeToLive() == that.getTimeToLive(); - } - - /** @return Either a DeletedCell, or an ExpiringCell. */ - public static Cell create(CellName name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore, ColumnSerializer.Flag flag) - { - if (localExpirationTime >= expireBefore || flag == ColumnSerializer.Flag.PRESERVE_SIZE) - return new BufferExpiringCell(name, value, timestamp, timeToLive, localExpirationTime); - // The column is now expired, we can safely return a simple tombstone. Note that - // as long as the expiring column and the tombstone put together live longer than GC grace seconds, - // we'll fulfil our responsibility to repair. See discussion at - // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html - return new BufferDeletedCell(name, localExpirationTime - timeToLive, timestamp); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/CBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CBuilder.java b/src/java/org/apache/cassandra/db/CBuilder.java new file mode 100644 index 0000000..56cabf1 --- /dev/null +++ b/src/java/org/apache/cassandra/db/CBuilder.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.cassandra.db.marshal.AbstractType; + +/** + * Allows to build ClusteringPrefixes, either Clustering or Slice.Bound. + */ +public abstract class CBuilder +{ + public static CBuilder STATIC_BUILDER = new CBuilder() + { + public int count() + { + return 0; + } + + public int remainingCount() + { + return 0; + } + + public ClusteringComparator comparator() + { + throw new UnsupportedOperationException(); + } + + public CBuilder add(ByteBuffer value) + { + throw new UnsupportedOperationException(); + } + + public CBuilder add(Object value) + { + throw new UnsupportedOperationException(); + } + + public Clustering build() + { + return Clustering.STATIC_CLUSTERING; + } + + public Slice.Bound buildBound(boolean isStart, boolean isInclusive) + { + throw new UnsupportedOperationException(); + } + + public Slice buildSlice() + { + throw new UnsupportedOperationException(); + } + + public Clustering buildWith(ByteBuffer value) + { + throw new UnsupportedOperationException(); + } + + public Clustering buildWith(List<ByteBuffer> newValues) + { + throw new UnsupportedOperationException(); + } + + public Slice.Bound buildBoundWith(ByteBuffer value, boolean isStart, boolean isInclusive) + { + throw new UnsupportedOperationException(); + } + + public Slice.Bound buildBoundWith(List<ByteBuffer> newValues, boolean isStart, boolean isInclusive) + { + throw new UnsupportedOperationException(); + } + }; + + public static CBuilder create(ClusteringComparator comparator) + { + return new ArrayBackedBuilder(comparator); + } + + public abstract int count(); + public abstract int remainingCount(); + public abstract ClusteringComparator comparator(); + public abstract CBuilder add(ByteBuffer value); + public abstract CBuilder add(Object value); + public abstract Clustering build(); + public abstract Slice.Bound buildBound(boolean isStart, boolean isInclusive); + public abstract Slice buildSlice(); + public abstract Clustering buildWith(ByteBuffer value); + public abstract Clustering buildWith(List<ByteBuffer> newValues); + public abstract Slice.Bound buildBoundWith(ByteBuffer value, boolean isStart, boolean isInclusive); + public abstract Slice.Bound buildBoundWith(List<ByteBuffer> newValues, boolean isStart, boolean isInclusive); + + private static class ArrayBackedBuilder extends CBuilder + { + private final ClusteringComparator type; + private final ByteBuffer[] values; + private int size; + private boolean built; + + public ArrayBackedBuilder(ClusteringComparator type) + { + this.type = type; + this.values = new ByteBuffer[type.size()]; + } + + public int count() + { + return size; + } + + public int remainingCount() + { + return values.length - size; + } + + public ClusteringComparator comparator() + { + return type; + } + + public CBuilder add(ByteBuffer value) + { + if (isDone()) + throw new IllegalStateException(); + values[size++] = value; + return this; + } + + public CBuilder add(Object value) + { + return add(((AbstractType)type.subtype(size)).decompose(value)); + } + + private boolean isDone() + { + return remainingCount() == 0 || built; + } + + public Clustering build() + { + // We don't allow to add more element to a builder that has been built so + // that we don't have to copy values. + built = true; + + // Currently, only dense table can leave some clustering column out (see #7990) + return size == 0 ? Clustering.EMPTY : new SimpleClustering(values); + } + + public Slice.Bound buildBound(boolean isStart, boolean isInclusive) + { + // We don't allow to add more element to a builder that has been built so + // that we don't have to copy values (even though we have to do it in most cases). + built = true; + + if (size == 0) + return isStart ? Slice.Bound.BOTTOM : Slice.Bound.TOP; + + return Slice.Bound.create(Slice.Bound.boundKind(isStart, isInclusive), + size == values.length ? values : Arrays.copyOfRange(values, 0, size)); + } + + public Slice buildSlice() + { + // We don't allow to add more element to a builder that has been built so + // that we don't have to copy values. + built = true; + + if (size == 0) + return Slice.ALL; + + return Slice.make(buildBound(true, true), buildBound(false, true)); + } + + public Clustering buildWith(ByteBuffer value) + { + assert size+1 == type.size(); + + ByteBuffer[] newValues = Arrays.copyOf(values, size+1); + newValues[size] = value; + return new SimpleClustering(newValues); + } + + public Clustering buildWith(List<ByteBuffer> newValues) + { + assert size + newValues.size() == type.size(); + ByteBuffer[] buffers = Arrays.copyOf(values, size + newValues.size()); + int newSize = size; + for (ByteBuffer value : newValues) + buffers[newSize++] = value; + + return new SimpleClustering(buffers); + } + + public Slice.Bound buildBoundWith(ByteBuffer value, boolean isStart, boolean isInclusive) + { + ByteBuffer[] newValues = Arrays.copyOf(values, size+1); + newValues[size] = value; + return Slice.Bound.create(Slice.Bound.boundKind(isStart, isInclusive), newValues); + } + + public Slice.Bound buildBoundWith(List<ByteBuffer> newValues, boolean isStart, boolean isInclusive) + { + ByteBuffer[] buffers = Arrays.copyOf(values, size + newValues.size()); + int newSize = size; + for (ByteBuffer value : newValues) + buffers[newSize++] = value; + + return Slice.Bound.create(Slice.Bound.boundKind(isStart, isInclusive), buffers); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/CFRowAdder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CFRowAdder.java b/src/java/org/apache/cassandra/db/CFRowAdder.java deleted file mode 100644 index 6fab8d5..0000000 --- a/src/java/org/apache/cassandra/db/CFRowAdder.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.nio.ByteBuffer; - -import org.apache.cassandra.cql3.ColumnIdentifier; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.CollectionType; -import org.apache.cassandra.db.marshal.ListType; -import org.apache.cassandra.db.marshal.MapType; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.UUIDGen; - -/** - * Convenience object to populate a given CQL3 row in a ColumnFamily object. - * - * This is meant for when performance is not of the utmost importance. When - * performance matters, it might be worth allocating such builder. - */ -public class CFRowAdder -{ - public final ColumnFamily cf; - public final Composite prefix; - public final long timestamp; - public final int ttl; - private final int ldt; - - public CFRowAdder(ColumnFamily cf, Composite prefix, long timestamp) - { - this(cf, prefix, timestamp, 0); - } - - public CFRowAdder(ColumnFamily cf, Composite prefix, long timestamp, int ttl) - { - this.cf = cf; - this.prefix = prefix; - this.timestamp = timestamp; - this.ttl = ttl; - this.ldt = (int) (System.currentTimeMillis() / 1000); - - // If a CQL3 table, add the row marker - if (cf.metadata().isCQL3Table() && !prefix.isStatic()) - cf.addColumn(new BufferCell(cf.getComparator().rowMarker(prefix), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp)); - } - - public CFRowAdder add(String cql3ColumnName, Object value) - { - ColumnDefinition def = getDefinition(cql3ColumnName); - return add(cf.getComparator().create(prefix, def), def, value); - } - - public CFRowAdder resetCollection(String cql3ColumnName) - { - ColumnDefinition def = getDefinition(cql3ColumnName); - assert def.type.isCollection() && def.type.isMultiCell(); - Composite name = cf.getComparator().create(prefix, def); - cf.addAtom(new RangeTombstone(name.start(), name.end(), timestamp - 1, ldt)); - return this; - } - - public CFRowAdder addMapEntry(String cql3ColumnName, Object key, Object value) - { - ColumnDefinition def = getDefinition(cql3ColumnName); - assert def.type instanceof MapType; - MapType mt = (MapType)def.type; - CellName name = cf.getComparator().create(prefix, def, mt.getKeysType().decompose(key)); - return add(name, def, value); - } - - public CFRowAdder addListEntry(String cql3ColumnName, Object value) - { - ColumnDefinition def = getDefinition(cql3ColumnName); - assert def.type instanceof ListType; - CellName name = cf.getComparator().create(prefix, def, ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())); - return add(name, def, value); - } - - private ColumnDefinition getDefinition(String name) - { - return cf.metadata().getColumnDefinition(new ColumnIdentifier(name, false)); - } - - private CFRowAdder add(CellName name, ColumnDefinition def, Object value) - { - if (value == null) - { - cf.addColumn(new BufferDeletedCell(name, ldt, timestamp)); - } - else - { - AbstractType valueType = def.type.isCollection() - ? ((CollectionType) def.type).valueComparator() - : def.type; - ByteBuffer valueBytes = value instanceof ByteBuffer ? (ByteBuffer)value : valueType.decompose(value); - if (ttl == 0) - cf.addColumn(new BufferCell(name, valueBytes, timestamp)); - else - cf.addColumn(new BufferExpiringCell(name, valueBytes, timestamp, ttl)); - } - return this; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Cell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Cell.java b/src/java/org/apache/cassandra/db/Cell.java deleted file mode 100644 index 7c3926a..0000000 --- a/src/java/org/apache/cassandra/db/Cell.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.nio.ByteBuffer; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.memory.AbstractAllocator; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.memory.MemtableAllocator; - -/** - * Cell is immutable, which prevents all kinds of confusion in a multithreaded environment. - */ -public interface Cell extends OnDiskAtom -{ - public static final int MAX_NAME_LENGTH = FBUtilities.MAX_UNSIGNED_SHORT; - - public Cell withUpdatedName(CellName newName); - - public Cell withUpdatedTimestamp(long newTimestamp); - - @Override - public CellName name(); - - public ByteBuffer value(); - - public boolean isLive(); - - public boolean isLive(long now); - - public int cellDataSize(); - - // returns the size of the Cell and all references on the heap, excluding any costs associated with byte arrays - // that would be allocated by a localCopy, as these will be accounted for by the allocator - public long unsharedHeapSizeExcludingData(); - - public int serializedSize(CellNameType type, TypeSizes typeSizes); - - public int serializationFlags(); - - public Cell diff(Cell cell); - - public Cell reconcile(Cell cell); - - public Cell localCopy(CFMetaData metadata, AbstractAllocator allocator); - - public Cell localCopy(CFMetaData metaData, MemtableAllocator allocator, OpOrder.Group opGroup); - - public String getString(CellNameType comparator); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Clusterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Clusterable.java b/src/java/org/apache/cassandra/db/Clusterable.java new file mode 100644 index 0000000..62ab9dc --- /dev/null +++ b/src/java/org/apache/cassandra/db/Clusterable.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +/** + * Common class for objects that are identified by a clustering prefix, and can be thus sorted by a + * {@link ClusteringComparator}. + */ +public interface Clusterable +{ + public ClusteringPrefix clustering(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Clustering.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Clustering.java b/src/java/org/apache/cassandra/db/Clustering.java new file mode 100644 index 0000000..5ac9671 --- /dev/null +++ b/src/java/org/apache/cassandra/db/Clustering.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.io.util.DataOutputPlus; + +/** + * The clustering column values for a row. + * <p> + * A {@code Clustering} is a {@code ClusteringPrefix} that must always be "complete", i.e. have + * as many values as there is clustering columns in the table it is part of. It is the clustering + * prefix used by rows. + * <p> + * Note however that while it's size must be equal to the table clustering size, a clustering can have + * {@code null} values, and this mostly for thrift backward compatibility (in practice, if a value is null, + * all of the following ones will be too because that's what thrift allows, but it's never assumed by the + * code so we could start generally allowing nulls for clustering columns if we wanted to). + */ +public abstract class Clustering extends AbstractClusteringPrefix +{ + public static final Serializer serializer = new Serializer(); + + /** + * The special cased clustering used by all static rows. It is a special case in the + * sense that it's always empty, no matter how many clustering columns the table has. + */ + public static final Clustering STATIC_CLUSTERING = new EmptyClustering() + { + @Override + public Kind kind() + { + return Kind.STATIC_CLUSTERING; + } + + @Override + public String toString(CFMetaData metadata) + { + return "STATIC"; + } + }; + + /** Empty clustering for tables having no clustering columns. */ + public static final Clustering EMPTY = new EmptyClustering(); + + public Kind kind() + { + return Kind.CLUSTERING; + } + + public Clustering takeAlias() + { + ByteBuffer[] values = new ByteBuffer[size()]; + for (int i = 0; i < size(); i++) + values[i] = get(i); + return new SimpleClustering(values); + } + + public String toString(CFMetaData metadata) + { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < size(); i++) + { + ColumnDefinition c = metadata.clusteringColumns().get(i); + sb.append(i == 0 ? "" : ", ").append(c.name).append("=").append(get(i) == null ? "null" : c.type.getString(get(i))); + } + return sb.toString(); + } + + public String toCQLString(CFMetaData metadata) + { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < size(); i++) + { + ColumnDefinition c = metadata.clusteringColumns().get(i); + sb.append(i == 0 ? "" : ", ").append(c.type.getString(get(i))); + } + return sb.toString(); + } + + private static class EmptyClustering extends Clustering + { + private static final ByteBuffer[] EMPTY_VALUES_ARRAY = new ByteBuffer[0]; + + public int size() + { + return 0; + } + + public ByteBuffer get(int i) + { + throw new UnsupportedOperationException(); + } + + public ByteBuffer[] getRawValues() + { + return EMPTY_VALUES_ARRAY; + } + + @Override + public Clustering takeAlias() + { + return this; + } + + @Override + public long unsharedHeapSize() + { + return 0; + } + + @Override + public String toString(CFMetaData metadata) + { + return "EMPTY"; + } + } + + /** + * Serializer for Clustering object. + * <p> + * Because every clustering in a given table must have the same size (ant that size cannot actually change once the table + * has been defined), we don't record that size. + */ + public static class Serializer + { + public void serialize(Clustering clustering, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException + { + ClusteringPrefix.serializer.serializeValuesWithoutSize(clustering, out, version, types); + } + + public long serializedSize(Clustering clustering, int version, List<AbstractType<?>> types, TypeSizes sizes) + { + return ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(clustering, version, types, sizes); + } + + public void deserialize(DataInput in, int version, List<AbstractType<?>> types, Writer writer) throws IOException + { + ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, types.size(), version, types, writer); + } + + public Clustering deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException + { + SimpleClustering.Builder builder = SimpleClustering.builder(types.size()); + deserialize(in, version, types, builder); + return builder.build(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ClusteringComparator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ClusteringComparator.java b/src/java/org/apache/cassandra/db/ClusteringComparator.java new file mode 100644 index 0000000..b0e8e5c --- /dev/null +++ b/src/java/org/apache/cassandra/db/ClusteringComparator.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +import com.google.common.base.Joiner; + +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.utils.ByteBufferUtil; + +import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo; + +/** + * A comparator of clustering prefixes (or more generally of {@link Clusterable}}. + * <p> + * This is essentially just a composite comparator that the clustering values of the provided + * clustering prefixes in lexicographical order, with each component being compared based on + * the type of the clustering column this is a value of. + */ +public class ClusteringComparator implements Comparator<Clusterable> +{ + private final List<AbstractType<?>> clusteringTypes; + private final boolean isByteOrderComparable; + + private final Comparator<IndexInfo> indexComparator; + private final Comparator<IndexInfo> indexReverseComparator; + private final Comparator<Clusterable> reverseComparator; + + public ClusteringComparator(AbstractType<?>... clusteringTypes) + { + this(Arrays.<AbstractType<?>>asList(clusteringTypes)); + } + + public ClusteringComparator(List<AbstractType<?>> clusteringTypes) + { + this.clusteringTypes = clusteringTypes; + this.isByteOrderComparable = isByteOrderComparable(clusteringTypes); + + this.indexComparator = new Comparator<IndexInfo>() + { + public int compare(IndexInfo o1, IndexInfo o2) + { + return ClusteringComparator.this.compare(o1.lastName, o2.lastName); + } + }; + this.indexReverseComparator = new Comparator<IndexInfo>() + { + public int compare(IndexInfo o1, IndexInfo o2) + { + return ClusteringComparator.this.compare(o1.firstName, o2.firstName); + } + }; + this.reverseComparator = new Comparator<Clusterable>() + { + public int compare(Clusterable c1, Clusterable c2) + { + return ClusteringComparator.this.compare(c2, c1); + } + }; + } + + private static boolean isByteOrderComparable(Iterable<AbstractType<?>> types) + { + boolean isByteOrderComparable = true; + for (AbstractType<?> type : types) + isByteOrderComparable &= type.isByteOrderComparable(); + return isByteOrderComparable; + } + + /** + * The number of clustering columns for the table this is the comparator of. + */ + public int size() + { + return clusteringTypes.size(); + } + + /** + * The "subtypes" of this clustering comparator, that is the types of the clustering + * columns for the table this is a comparator of. + */ + public List<AbstractType<?>> subtypes() + { + return clusteringTypes; + } + + /** + * Returns the type of the ith clustering column of the table. + */ + public AbstractType<?> subtype(int i) + { + return clusteringTypes.get(i); + } + + /** + * Creates a row clustering based on the clustering values. + * <p> + * Every argument can either be a {@code ByteBuffer}, in which case it is used as-is, or a object + * corresponding to the type of the corresponding clustering column, in which case it will be + * converted to a byte buffer using the column type. + * + * @param values the values to use for the created clustering. There should be exactly {@code size()} + * values which must be either byte buffers or of the type the column expect. + * + * @return the newly created clustering. + */ + public Clustering make(Object... values) + { + if (values.length != size()) + throw new IllegalArgumentException(String.format("Invalid number of components, expecting %d but got %d", size(), values.length)); + + CBuilder builder = CBuilder.create(this); + for (int i = 0; i < values.length; i++) + { + Object val = values[i]; + if (val instanceof ByteBuffer) + builder.add((ByteBuffer)val); + else + builder.add(val); + } + return builder.build(); + } + + public int compare(Clusterable c1, Clusterable c2) + { + return compare(c1.clustering(), c2.clustering()); + } + + public int compare(ClusteringPrefix c1, ClusteringPrefix c2) + { + int s1 = c1.size(); + int s2 = c2.size(); + int minSize = Math.min(s1, s2); + + for (int i = 0; i < minSize; i++) + { + int cmp = compareComponent(i, c1.get(i), c2.get(i)); + if (cmp != 0) + return cmp; + } + + if (s1 == s2) + return ClusteringPrefix.Kind.compare(c1.kind(), c2.kind()); + + return s1 < s2 ? c1.kind().prefixComparisonResult : -c2.kind().prefixComparisonResult; + } + + public int compare(Clustering c1, Clustering c2) + { + for (int i = 0; i < size(); i++) + { + int cmp = compareComponent(i, c1.get(i), c2.get(i)); + if (cmp != 0) + return cmp; + } + return 0; + } + + public int compareComponent(int i, ByteBuffer v1, ByteBuffer v2) + { + if (v1 == null) + return v1 == null ? 0 : -1; + if (v2 == null) + return 1; + + return isByteOrderComparable + ? ByteBufferUtil.compareUnsigned(v1, v2) + : clusteringTypes.get(i).compare(v1, v2); + } + + /** + * Returns whether this clustering comparator is compatible with the provided one, + * that is if the provided one can be safely replaced by this new one. + * + * @param previous the previous comparator that we want to replace and test + * compatibility with. + * + * @return whether {@code previous} can be safely replaced by this comparator. + */ + public boolean isCompatibleWith(ClusteringComparator previous) + { + if (this == previous) + return true; + + // Extending with new components is fine, shrinking is not + if (size() < previous.size()) + return false; + + for (int i = 0; i < previous.size(); i++) + { + AbstractType<?> tprev = previous.subtype(i); + AbstractType<?> tnew = subtype(i); + if (!tnew.isCompatibleWith(tprev)) + return false; + } + return true; + } + + /** + * Validates the provided prefix for corrupted data. + * + * @param clustering the clustering prefix to validate. + * + * @throws MarshalException if {@code clustering} contains some invalid data. + */ + public void validate(ClusteringPrefix clustering) + { + for (int i = 0; i < clustering.size(); i++) + { + ByteBuffer value = clustering.get(i); + if (value != null) + subtype(i).validate(value); + } + } + + public Comparator<IndexInfo> indexComparator(boolean reversed) + { + return reversed ? indexReverseComparator : indexComparator; + } + + public Comparator<Clusterable> reversed() + { + return reverseComparator; + } + + /** + * Whether the two provided clustering prefix are on the same clustering values. + * + * @param c1 the first prefix. + * @param c2 the second prefix. + * @return whether {@code c1} and {@code c2} have the same clustering values (but not necessarily + * the same "kind") or not. + */ + public boolean isOnSameClustering(ClusteringPrefix c1, ClusteringPrefix c2) + { + if (c1.size() != c2.size()) + return false; + + for (int i = 0; i < c1.size(); i++) + { + if (compareComponent(i, c1.get(i), c2.get(i)) != 0) + return false; + } + return true; + } + + @Override + public String toString() + { + return String.format("comparator(%s)", Joiner.on(", ").join(clusteringTypes)); + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + + if (!(o instanceof ClusteringComparator)) + return false; + + ClusteringComparator that = (ClusteringComparator)o; + return this.clusteringTypes.equals(that.clusteringTypes); + } + + @Override + public int hashCode() + { + return Objects.hashCode(clusteringTypes); + } +}
