http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java new file mode 100644 index 0000000..5bfd1a3 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.util.Objects; + +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Iterators; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; + +public abstract class AbstractUnfilteredRowIterator extends AbstractIterator<Unfiltered> implements UnfilteredRowIterator +{ + protected final CFMetaData metadata; + protected final DecoratedKey partitionKey; + protected final DeletionTime partitionLevelDeletion; + protected final PartitionColumns columns; + protected final Row staticRow; + protected final boolean isReverseOrder; + protected final RowStats stats; + + protected AbstractUnfilteredRowIterator(CFMetaData metadata, + DecoratedKey partitionKey, + DeletionTime partitionLevelDeletion, + PartitionColumns columns, + Row staticRow, + boolean isReverseOrder, + RowStats stats) + { + this.metadata = metadata; + this.partitionKey = partitionKey; + this.partitionLevelDeletion = partitionLevelDeletion; + this.columns = columns; + this.staticRow = staticRow; + this.isReverseOrder = isReverseOrder; + this.stats = stats; + } + + public CFMetaData metadata() + { + return metadata; + } + + public PartitionColumns columns() + { + return columns; + } + + public boolean isReverseOrder() + { + return isReverseOrder; + } + + public DecoratedKey partitionKey() + { + return partitionKey; + } + + public DeletionTime partitionLevelDeletion() + { + return partitionLevelDeletion; + } + + public Row staticRow() + { + return staticRow; + } + + public RowStats stats() + { + return stats; + } + + public void close() + { + } + + public static boolean equal(UnfilteredRowIterator a, UnfilteredRowIterator b) + { + return Objects.equals(a.columns(), b.columns()) + && Objects.equals(a.metadata(), b.metadata()) + && Objects.equals(a.isReverseOrder(), b.isReverseOrder()) + && Objects.equals(a.partitionKey(), b.partitionKey()) + && Objects.equals(a.partitionLevelDeletion(), b.partitionLevelDeletion()) + && Objects.equals(a.staticRow(), b.staticRow()) + && Objects.equals(a.stats(), b.stats()) + && Objects.equals(a.metadata(), b.metadata()) + && Iterators.elementsEqual(a, b); + } + +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/Cell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java b/src/java/org/apache/cassandra/db/rows/Cell.java new file mode 100644 index 0000000..80bf901 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/Cell.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.security.MessageDigest; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.Aliasable; +import org.apache.cassandra.db.LivenessInfo; + +/** + * A cell holds a single "simple" value for a given column, as well as "liveness" + * informations regarding that value. + * <p> + * The is 2 kind of columns: simple ones and complex ones. + * Simple columns have only a single associated cell, while complex ones, + * the one corresponding to non-frozen collections and UDTs, are comprised + * of multiple cells. For complex columns, the different cells are distinguished + * by their cell path. + * <p> + * We can also distinguish different kind of cells based on the property of their + * {@link #livenessInfo}: + * 1) "Normal" cells: their liveness info has no ttl and no deletion time. + * 2) Expiring cells: their liveness info has both a ttl and a deletion time (the latter + * deciding when the cell is actually expired). + * 3) Tombstones/deleted cells: their liveness info has a deletion time but no ttl. Those + * cells don't really have a value but their {@link #value} method return an empty + * buffer by convention. + */ +public interface Cell extends Aliasable<Cell> +{ + /** + * The column this cell belongs to. + * + * @return the column this cell belongs to. + */ + public ColumnDefinition column(); + + /** + * Whether the cell is a counter cell or not. + * + * @return whether the cell is a counter cell or not. + */ + public boolean isCounterCell(); + + /** + * The cell value. + * + * @return the cell value. + */ + public ByteBuffer value(); + + /** + * The liveness info of the cell, that is its timestamp and whether it is + * expiring, deleted or none of the above. + * + * @return the cell {@link LivenessInfo}. + */ + public LivenessInfo livenessInfo(); + + /** + * Whether the cell is a tombstone or not. + * + * @return whether the cell is a tombstone or not. + */ + public boolean isTombstone(); + + /** + * Whether the cell is an expiring one or not. + * <p> + * Note that this only correspond to whether the cell liveness info + * have a TTL or not, but doesn't tells whether the cell is already expired + * or not. You should use {@link #isLive} for that latter information. + * + * @return whether the cell is an expiring one or not. + */ + public boolean isExpiring(); + + /** + * Whether the cell is live or not given the current time. + * + * @param nowInSec the current time in seconds. This is used to + * decide if an expiring cell is expired or live. + * @return whether the cell is live or not at {@code nowInSec}. + */ + public boolean isLive(int nowInSec); + + /** + * For cells belonging to complex types (non-frozen collection and UDT), the + * path to the cell. + * + * @return the cell path for cells of complex column, and {@code null} for other cells. + */ + public CellPath path(); + + /** + * Write the cell to the provided writer. + * + * @param writer the row writer to write the cell to. + */ + public void writeTo(Row.Writer writer); + + /** + * Adds the cell to the provided digest. + * + * @param digest the {@code MessageDigest} to add the cell to. + */ + public void digest(MessageDigest digest); + + /** + * Validate the cell value. + * + * @throws MarshalException if the cell value is not a valid value for + * the column type this is a cell of. + */ + public void validate(); + + /** + * The size of the data hold by this cell. + * + * This is mainly used to verify if batches goes over a given size. + * + * @return the size used by the data of this cell. + */ + public int dataSize(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/CellData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/CellData.java b/src/java/org/apache/cassandra/db/rows/CellData.java new file mode 100644 index 0000000..29eac01 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/CellData.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.utils.ObjectSizes; + +/** + * Contains (non-counter) cell data for one or more rows. + */ +class CellData +{ + private boolean isCounter; + + private ByteBuffer[] values; + private final LivenessInfoArray livenessInfos; + + CellData(int initialCellCapacity, boolean isCounter) + { + this.isCounter = isCounter; + this.values = new ByteBuffer[initialCellCapacity]; + this.livenessInfos = new LivenessInfoArray(initialCellCapacity); + } + + public void setCell(int idx, ByteBuffer value, LivenessInfo info) + { + ensureCapacity(idx); + values[idx] = value; + livenessInfos.set(idx, info); + } + + public boolean hasCell(int idx) + { + return idx < values.length && values[idx] != null; + } + + public ByteBuffer value(int idx) + { + return values[idx]; + } + + public void setValue(int idx, ByteBuffer value) + { + values[idx] = value; + } + + private void ensureCapacity(int idxToSet) + { + int originalCapacity = values.length; + if (idxToSet < originalCapacity) + return; + + int newCapacity = RowDataBlock.computeNewCapacity(originalCapacity, idxToSet); + + values = Arrays.copyOf(values, newCapacity); + livenessInfos.resize(newCapacity); + } + + // Swap cell i and j + public void swapCell(int i, int j) + { + ensureCapacity(Math.max(i, j)); + + ByteBuffer value = values[j]; + values[j] = values[i]; + values[i] = value; + + livenessInfos.swap(i, j); + } + + // Merge cell i into j + public void mergeCell(int i, int j, int nowInSec) + { + if (isCounter) + mergeCounterCell(this, i, this, j, this, j, nowInSec); + else + mergeRegularCell(this, i, this, j, this, j, nowInSec); + } + + private static boolean handleNoCellCase(CellData d1, int i1, CellData d2, int i2, CellData merged, int iMerged) + { + if (!d1.hasCell(i1)) + { + if (d2.hasCell(i2)) + d2.moveCell(i2, merged, iMerged); + return true; + } + if (!d2.hasCell(i2)) + { + d1.moveCell(i1, merged, iMerged); + return true; + } + return false; + } + + public static void mergeRegularCell(CellData d1, int i1, CellData d2, int i2, CellData merged, int iMerged, int nowInSec) + { + if (handleNoCellCase(d1, i1, d2, i2, merged, iMerged)) + return; + + Conflicts.Resolution res = Conflicts.resolveRegular(d1.livenessInfos.timestamp(i1), + d1.livenessInfos.isLive(i1, nowInSec), + d1.livenessInfos.localDeletionTime(i1), + d1.values[i1], + d2.livenessInfos.timestamp(i2), + d2.livenessInfos.isLive(i2, nowInSec), + d2.livenessInfos.localDeletionTime(i2), + d2.values[i2]); + + assert res != Conflicts.Resolution.MERGE; + if (res == Conflicts.Resolution.LEFT_WINS) + d1.moveCell(i1, merged, iMerged); + else + d2.moveCell(i2, merged, iMerged); + } + + public static void mergeCounterCell(CellData d1, int i1, CellData d2, int i2, CellData merged, int iMerged, int nowInSec) + { + if (handleNoCellCase(d1, i1, d2, i2, merged, iMerged)) + return; + + Conflicts.Resolution res = Conflicts.resolveCounter(d1.livenessInfos.timestamp(i1), + d1.livenessInfos.isLive(i1, nowInSec), + d1.values[i1], + d2.livenessInfos.timestamp(i2), + d2.livenessInfos.isLive(i2, nowInSec), + d2.values[i2]); + + switch (res) + { + case LEFT_WINS: + d1.moveCell(i1, merged, iMerged); + break; + case RIGHT_WINS: + d2.moveCell(i2, merged, iMerged); + break; + default: + merged.values[iMerged] = Conflicts.mergeCounterValues(d1.values[i1], d2.values[i2]); + if (d1.livenessInfos.timestamp(i1) > d2.livenessInfos.timestamp(i2)) + merged.livenessInfos.set(iMerged, d1.livenessInfos.timestamp(i1), d1.livenessInfos.ttl(i1), d1.livenessInfos.localDeletionTime(i1)); + else + merged.livenessInfos.set(iMerged, d2.livenessInfos.timestamp(i2), d2.livenessInfos.ttl(i2), d2.livenessInfos.localDeletionTime(i2)); + break; + } + } + + // Move cell i into j + public void moveCell(int i, int j) + { + moveCell(i, this, j); + } + + public void moveCell(int i, CellData target, int j) + { + if (!hasCell(i) || (target == this && i == j)) + return; + + target.ensureCapacity(j); + + target.values[j] = values[i]; + target.livenessInfos.set(j, livenessInfos.timestamp(i), + livenessInfos.ttl(i), + livenessInfos.localDeletionTime(i)); + } + + public int dataSize() + { + int size = livenessInfos.dataSize(); + for (int i = 0; i < values.length; i++) + if (values[i] != null) + size += values[i].remaining(); + return size; + } + + public void clear() + { + Arrays.fill(values, null); + livenessInfos.clear(); + } + + public long unsharedHeapSizeExcludingData() + { + return ObjectSizes.sizeOnHeapExcludingData(values) + + livenessInfos.unsharedHeapSize(); + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + sb.append("CellData(size=").append(values.length); + if (isCounter) + sb.append(", counter"); + sb.append("){"); + LivenessInfoArray.Cursor cursor = LivenessInfoArray.newCursor(); + for (int i = 0; i < values.length; i++) + { + if (values[i] == null) + { + sb.append("[null]"); + continue; + } + sb.append("[len(v)=").append(values[i].remaining()); + sb.append(", info=").append(cursor.setTo(livenessInfos, i)); + sb.append("]"); + } + return sb.append("}").toString(); + } + + static class ReusableCell extends AbstractCell + { + private final LivenessInfoArray.Cursor cursor = LivenessInfoArray.newCursor(); + + private CellData data; + private ColumnDefinition column; + protected int idx; + + ReusableCell setTo(CellData data, ColumnDefinition column, int idx) + { + if (!data.hasCell(idx)) + return null; + + this.data = data; + this.column = column; + this.idx = idx; + + cursor.setTo(data.livenessInfos, idx); + return this; + } + + public ColumnDefinition column() + { + return column; + } + + public boolean isCounterCell() + { + return data.isCounter && !cursor.hasLocalDeletionTime(); + } + + public ByteBuffer value() + { + return data.value(idx); + } + + public LivenessInfo livenessInfo() + { + return cursor; + } + + public CellPath path() + { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/CellPath.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/CellPath.java b/src/java/org/apache/cassandra/db/rows/CellPath.java new file mode 100644 index 0000000..8233ac2 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/CellPath.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.util.Objects; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataOutputPlus; + +/** + * A path for a cell belonging to a complex column type (non-frozen collection or UDT). + */ +public abstract class CellPath +{ + public static final CellPath BOTTOM = new EmptyCellPath(); + public static final CellPath TOP = new EmptyCellPath(); + + public abstract int size(); + public abstract ByteBuffer get(int i); + + // The only complex we currently have are collections that have only one value. + public static CellPath create(ByteBuffer value) + { + assert value != null; + return new SimpleCellPath(new ByteBuffer[]{ value }); + } + + public int dataSize() + { + int size = 0; + for (int i = 0; i < size(); i++) + size += get(i).remaining(); + return size; + } + + public void digest(MessageDigest digest) + { + for (int i = 0; i < size(); i++) + digest.update(get(i).duplicate()); + } + + @Override + public final int hashCode() + { + int result = 31; + for (int i = 0; i < size(); i++) + result += 31 * Objects.hash(get(i)); + return result; + } + + @Override + public final boolean equals(Object o) + { + if(!(o instanceof CellPath)) + return false; + + CellPath that = (CellPath)o; + if (this.size() != that.size()) + return false; + + for (int i = 0; i < size(); i++) + if (!Objects.equals(this.get(i), that.get(i))) + return false; + + return true; + } + + public interface Serializer + { + public void serialize(CellPath path, DataOutputPlus out) throws IOException; + public CellPath deserialize(DataInput in) throws IOException; + public long serializedSize(CellPath path, TypeSizes sizes); + public void skip(DataInput in) throws IOException; + } + + static class SimpleCellPath extends CellPath + { + protected final ByteBuffer[] values; + + public SimpleCellPath(ByteBuffer[] values) + { + this.values = values; + } + + public int size() + { + return values.length; + } + + public ByteBuffer get(int i) + { + return values[i]; + } + } + + private static class EmptyCellPath extends CellPath + { + public int size() + { + return 0; + } + + public ByteBuffer get(int i) + { + throw new UnsupportedOperationException(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/Cells.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Cells.java b/src/java/org/apache/cassandra/db/rows/Cells.java new file mode 100644 index 0000000..1e329e5 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/Cells.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.util.Comparator; +import java.util.Iterator; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.context.CounterContext; +import org.apache.cassandra.db.index.SecondaryIndexManager; +import org.apache.cassandra.utils.ByteBufferUtil; + +/** + * Static methods to work on cells. + */ +public abstract class Cells +{ + private Cells() {} + + /** + * Writes a tombstone cell to the provided writer. + * + * @param writer the {@code Row.Writer} to write the tombstone to. + * @param column the column for the tombstone. + * @param timestamp the timestamp for the tombstone. + * @param localDeletionTime the local deletion time (in seconds) for the tombstone. + */ + public static void writeTombstone(Row.Writer writer, ColumnDefinition column, long timestamp, int localDeletionTime) + { + writer.writeCell(column, false, ByteBufferUtil.EMPTY_BYTE_BUFFER, SimpleLivenessInfo.forDeletion(timestamp, localDeletionTime), null); + } + + /** + * Computes the difference between a cell and the result of merging this + * cell to other cells. + * <p> + * This method is used when cells from multiple sources are merged and we want to + * find for a given source if it was up to date for that cell, and if not, what + * should be sent to the source to repair it. + * + * @param merged the cell that is the result of merging multiple source. + * @param cell the cell from one of the source that has been merged to yied + * {@code merged}. + * @return {@code null} if the source having {@code cell} is up-to-date for that + * cell, or a cell that applied to the source will "repair" said source otherwise. + */ + public static Cell diff(Cell merged, Cell cell) + { + // Note that it's enough to check if merged is a counterCell. If it isn't and + // cell is one, it means that merged is a tombstone with a greater timestamp + // than cell, because that's the only case where reconciling a counter with + // a tombstone don't yield a counter. If that's the case, the normal path will + // return what it should. + if (merged.isCounterCell()) + { + if (merged.livenessInfo().supersedes(cell.livenessInfo())) + return merged; + + // Reconciliation never returns something with a timestamp strictly lower than its operand. This + // means we're in the case where merged.timestamp() == cell.timestamp(). As 1) tombstones + // always win over counters (CASSANDRA-7346) and 2) merged is a counter, it follows that cell + // can't be a tombstone or merged would be one too. + assert !cell.isTombstone(); + + CounterContext.Relationship rel = CounterContext.instance().diff(merged.value(), cell.value()); + return (rel == CounterContext.Relationship.GREATER_THAN || rel == CounterContext.Relationship.DISJOINT) ? merged : null; + } + return merged.livenessInfo().supersedes(cell.livenessInfo()) ? merged : null; + } + + /** + * Reconciles/merges two cells, one being an update to an existing cell, + * yielding index updates if appropriate. + * <p> + * Note that this method assumes that the provided cells can meaningfully + * be reconciled together, that is that those cells are for the same row and same + * column (and same cell path if the column is complex). + * <p> + * Also note that which cell is provided as {@code existing} and which is + * provided as {@code update} matters for index updates. + * + * @param clustering the clustering for the row the cells to merge originate from. + * This is only used for index updates, so this can be {@code null} if + * {@code indexUpdater == SecondaryIndexManager.nullUpdater}. + * @param existing the pre-existing cell, the one that is updated. This can be + * {@code null} if this reconciliation correspond to an insertion. + * @param update the newly added cell, the update. This can be {@code null} out + * of convenience, in which case this function simply copy {@code existing} to + * {@code writer}. + * @param deletion the deletion time that applies to the cells being considered. + * This deletion time may delete both {@code existing} or {@code update}. + * @param writer the row writer to which the result of the reconciliation is written. + * @param nowInSec the current time in seconds (which plays a role during reconciliation + * because deleted cells always have precedence on timestamp equality and deciding if a + * cell is a live or not depends on the current time due to expiring cells). + * @param indexUpdater an index updater to which the result of the reconciliation is + * signaled (if relevant, that is if the update is not simply ignored by the reconciliation). + * This cannot be {@code null} but {@code SecondaryIndexManager.nullUpdater} can be passed. + * + * @return the timestamp delta between existing and update, or {@code Long.MAX_VALUE} if one + * of them is {@code null} or deleted by {@code deletion}). + */ + public static long reconcile(Clustering clustering, + Cell existing, + Cell update, + DeletionTime deletion, + Row.Writer writer, + int nowInSec, + SecondaryIndexManager.Updater indexUpdater) + { + existing = existing == null || deletion.deletes(existing.livenessInfo()) ? null : existing; + update = update == null || deletion.deletes(update.livenessInfo()) ? null : update; + if (existing == null || update == null) + { + if (update != null) + { + // It's inefficient that we call maybeIndex (which is for primary key indexes) on every cell, but + // we'll need to fix that damn 2ndary index API to avoid that. + updatePKIndexes(clustering, update, nowInSec, indexUpdater); + indexUpdater.insert(clustering, update); + update.writeTo(writer); + } + else if (existing != null) + { + existing.writeTo(writer); + } + return Long.MAX_VALUE; + } + + Cell reconciled = reconcile(existing, update, nowInSec); + reconciled.writeTo(writer); + + // Note that this test rely on reconcile returning either 'existing' or 'update'. That's not true for counters but we don't index them + if (reconciled == update) + { + updatePKIndexes(clustering, update, nowInSec, indexUpdater); + indexUpdater.update(clustering, existing, reconciled); + } + return Math.abs(existing.livenessInfo().timestamp() - update.livenessInfo().timestamp()); + } + + private static void updatePKIndexes(Clustering clustering, Cell cell, int nowInSec, SecondaryIndexManager.Updater indexUpdater) + { + if (indexUpdater != SecondaryIndexManager.nullUpdater && cell.isLive(nowInSec)) + indexUpdater.maybeIndex(clustering, cell.livenessInfo().timestamp(), cell.livenessInfo().ttl(), DeletionTime.LIVE); + } + + /** + * Reconciles/merge two cells. + * <p> + * Note that this method assumes that the provided cells can meaningfully + * be reconciled together, that is that cell are for the same row and same + * column (and same cell path if the column is complex). + * <p> + * This method is commutative over it's cells arguments: {@code reconcile(a, b, n) == reconcile(b, a, n)}. + * + * @param c1 the first cell participating in the reconciliation. + * @param c2 the second cell participating in the reconciliation. + * @param nowInSec the current time in seconds (which plays a role during reconciliation + * because deleted cells always have precedence on timestamp equality and deciding if a + * cell is a live or not depends on the current time due to expiring cells). + * + * @return a cell corresponding to the reconciliation of {@code c1} and {@code c2}. + * For non-counter cells, this will always be either {@code c1} or {@code c2}, but for + * counter cells this can be a newly allocated cell. + */ + public static Cell reconcile(Cell c1, Cell c2, int nowInSec) + { + if (c1 == null) + return c2 == null ? null : c2; + if (c2 == null) + return c1; + + if (c1.isCounterCell() || c2.isCounterCell()) + { + Conflicts.Resolution res = Conflicts.resolveCounter(c1.livenessInfo().timestamp(), + c1.isLive(nowInSec), + c1.value(), + c2.livenessInfo().timestamp(), + c2.isLive(nowInSec), + c2.value()); + + switch (res) + { + case LEFT_WINS: return c1; + case RIGHT_WINS: return c2; + default: + ByteBuffer merged = Conflicts.mergeCounterValues(c1.value(), c2.value()); + LivenessInfo mergedInfo = c1.livenessInfo().mergeWith(c2.livenessInfo()); + + // We save allocating a new cell object if it turns out that one cell was + // a complete superset of the other + if (merged == c1.value() && mergedInfo == c1.livenessInfo()) + return c1; + else if (merged == c2.value() && mergedInfo == c2.livenessInfo()) + return c2; + else // merge clocks and timestamps. + return create(c1.column(), true, merged, mergedInfo, null); + } + } + + Conflicts.Resolution res = Conflicts.resolveRegular(c1.livenessInfo().timestamp(), + c1.isLive(nowInSec), + c1.livenessInfo().localDeletionTime(), + c1.value(), + c2.livenessInfo().timestamp(), + c2.isLive(nowInSec), + c2.livenessInfo().localDeletionTime(), + c2.value()); + assert res != Conflicts.Resolution.MERGE; + return res == Conflicts.Resolution.LEFT_WINS ? c1 : c2; + } + + /** + * Computes the reconciliation of a complex column given its pre-existing + * cells and the ones it is updated with, and generating index update if + * appropriate. + * <p> + * Note that this method assumes that the provided cells can meaningfully + * be reconciled together, that is that the cells are for the same row and same + * complex column. + * <p> + * Also note that which cells is provided as {@code existing} and which are + * provided as {@code update} matters for index updates. + * + * @param clustering the clustering for the row the cells to merge originate from. + * This is only used for index updates, so this can be {@code null} if + * {@code indexUpdater == SecondaryIndexManager.nullUpdater}. + * @param column the complex column the cells are for. + * @param existing the pre-existing cells, the ones that are updated. This can be + * {@code null} if this reconciliation correspond to an insertion. + * @param update the newly added cells, the update. This can be {@code null} out + * of convenience, in which case this function simply copy the cells from + * {@code existing} to {@code writer}. + * @param deletion the deletion time that applies to the cells being considered. + * This deletion time may delete cells in both {@code existing} and {@code update}. + * @param writer the row writer to which the result of the reconciliation is written. + * @param nowInSec the current time in seconds (which plays a role during reconciliation + * because deleted cells always have precedence on timestamp equality and deciding if a + * cell is a live or not depends on the current time due to expiring cells). + * @param indexUpdater an index updater to which the result of the reconciliation is + * signaled (if relevant, that is if the updates are not simply ignored by the reconciliation). + * This cannot be {@code null} but {@code SecondaryIndexManager.nullUpdater} can be passed. + * + * @return the smallest timestamp delta between corresponding cells from existing and update. A + * timestamp delta being computed as the difference between a cell from {@code update} and the + * cell in {@code existing} having the same cell path (if such cell exists). If the intersection + * of cells from {@code existing} and {@code update} having the same cell path is empty, this + * returns {@code Long.MAX_VALUE}. + */ + public static long reconcileComplex(Clustering clustering, + ColumnDefinition column, + Iterator<Cell> existing, + Iterator<Cell> update, + DeletionTime deletion, + Row.Writer writer, + int nowInSec, + SecondaryIndexManager.Updater indexUpdater) + { + Comparator<CellPath> comparator = column.cellPathComparator(); + Cell nextExisting = getNext(existing); + Cell nextUpdate = getNext(update); + long timeDelta = Long.MAX_VALUE; + while (nextExisting != null || nextUpdate != null) + { + int cmp = nextExisting == null ? 1 + : (nextUpdate == null ? -1 + : comparator.compare(nextExisting.path(), nextUpdate.path())); + if (cmp < 0) + { + reconcile(clustering, nextExisting, null, deletion, writer, nowInSec, indexUpdater); + nextExisting = getNext(existing); + } + else if (cmp > 0) + { + reconcile(clustering, null, nextUpdate, deletion, writer, nowInSec, indexUpdater); + nextUpdate = getNext(update); + } + else + { + timeDelta = Math.min(timeDelta, reconcile(clustering, nextExisting, nextUpdate, deletion, writer, nowInSec, indexUpdater)); + nextExisting = getNext(existing); + nextUpdate = getNext(update); + } + } + return timeDelta; + } + + private static Cell getNext(Iterator<Cell> iterator) + { + return iterator == null || !iterator.hasNext() ? null : iterator.next(); + } + + /** + * Creates a simple cell. + * <p> + * Note that in general cell objects are created by the container they are in and so this method should + * only be used in a handful of cases when we know it's the right thing to do. + * + * @param column the column for the cell to create. + * @param isCounter whether the create cell should be a counter one. + * @param value the value for the cell. + * @param info the liveness info for the cell. + * @param path the cell path for the cell. + * @return the newly allocated cell object. + */ + public static Cell create(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path) + { + return new SimpleCell(column, isCounter, value, info, path); + } + + private static class SimpleCell extends AbstractCell + { + private final ColumnDefinition column; + private final boolean isCounter; + private final ByteBuffer value; + private final LivenessInfo info; + private final CellPath path; + + private SimpleCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path) + { + this.column = column; + this.isCounter = isCounter; + this.value = value; + this.info = info.takeAlias(); + this.path = path; + } + + public ColumnDefinition column() + { + return column; + } + + public boolean isCounterCell() + { + return isCounter; + } + + public ByteBuffer value() + { + return value; + } + + public LivenessInfo livenessInfo() + { + return info; + } + + public CellPath path() + { + return path; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/ColumnData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java b/src/java/org/apache/cassandra/db/rows/ColumnData.java new file mode 100644 index 0000000..ea472eb --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.util.Iterator; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.DeletionTime; + +public class ColumnData +{ + private final ColumnDefinition column; + private final Cell cell; + private final Iterator<Cell> cells; + private final DeletionTime complexDeletion; + + ColumnData(ColumnDefinition column, Cell cell, Iterator<Cell> cells, DeletionTime complexDeletion) + { + assert column != null && (cell != null || (column.isComplex() && cells != null && complexDeletion != null)); + + this.column = column; + this.cell = cell; + this.cells = cells; + this.complexDeletion = complexDeletion; + } + + public ColumnDefinition column() + { + return column; + } + + public Cell cell() + { + return cell; + } + + public Iterator<Cell> cells() + { + return cells; + } + + public DeletionTime complexDeletion() + { + return complexDeletion; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/ComplexRowDataBlock.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/ComplexRowDataBlock.java b/src/java/org/apache/cassandra/db/rows/ComplexRowDataBlock.java new file mode 100644 index 0000000..75df874 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/ComplexRowDataBlock.java @@ -0,0 +1,796 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.util.*; + +import com.google.common.collect.UnmodifiableIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.utils.ObjectSizes; + +/** + * Holds cells data and complex deletions for the complex columns of one or more rows. + * <p> + * Contrarily to {@code SimpleRowDataBlock}, each complex column can have multiple cells and + * we thus can't use a similar dense encoding. Instead, we still store the actual cell data + * in a {@code CellData} object, but we add a level of indirection (the cellIdx array in + * {@link ComplexCellBlock}) which for every column of every row stores 2 indexes: the index + * in the {@code CellData} where the first cell for this column is, and the the index of the + * last cell (or rather, the index to the first cell that does not belong to that column). + * <p> + * What makes this a little bit more complicated however is that in some cases (for + * {@link PartitionUpdate} typically), we need to be able to swap rows inside a + * {@code ComplexRowDataBlock} and the extra level of indirection makes that more complex. + * So in practice, we have 2 separate sub-implementation of a {@code ComplexRowDataBlock}: + * - The first one, {@code SimpleComplexRowDataBlock} does not support swapping rows + * (and is thus only used when we don't need to) but it uses a single {@code CellData} + * for all the rows stored. + * - The second one, {@code SortableComplexRowDataBlock}, uses one separate {@code CellData} + * per row (in fact, a {@code ComplexCellBlock} which groups the cell data with the + * indexing array discussed above) and simply keeps those per-row block in a list. It + * is thus less compact in memory but make the swapping of rows trivial. + */ +public abstract class ComplexRowDataBlock +{ + private static final Logger logger = LoggerFactory.getLogger(ComplexRowDataBlock.class); + + private final Columns columns; + + // For each complex column, it's deletion time (if any): the nth complex column of row i + // will have it's deletion time at complexDelTimes[(i * ccs) + n] where ccs it the number + // of complex columns in 'columns'. + final DeletionTimeArray complexDelTimes; + + protected ComplexRowDataBlock(Columns columns, int rows) + { + this.columns = columns; + + int columnCount = rows * columns.complexColumnCount(); + this.complexDelTimes = new DeletionTimeArray(columnCount); + } + + public static ComplexRowDataBlock create(Columns columns, int rows, boolean sortable, boolean isCounter) + { + return sortable + ? new SortableComplexRowDataBlock(columns, rows, isCounter) + : new SimpleComplexRowDataBlock(columns, rows, isCounter); + } + + public Columns columns() + { + return columns; + } + + public CellData cellData(int row) + { + return cellBlock(row).data; + } + + public int cellIdx(int row, ColumnDefinition c, CellPath path) + { + ComplexCellBlock block = cellBlock(row); + if (block == null) + return -1; + + int base = cellBlockBase(row); + int i = base + 2 * columns.complexIdx(c, 0); + + int start = block.cellIdx[i]; + int end = block.cellIdx[i+1]; + + if (i >= block.cellIdx.length || end <= start) + return -1; + + return Arrays.binarySearch(block.complexPaths, start, end, path, c.cellPathComparator()); + } + + // The following methods abstract the fact that we have 2 sub-implementations: both + // implementation will use a ComplexCellBlock to store a row, but one will use one + // ComplexCellBlock per row, while the other will store all rows into the same block. + + // Returns the cell block for a given row. Can return null if the asked row has no data. + protected abstract ComplexCellBlock cellBlock(int row); + // Same as cellBlock(), but create the proper block if the row doesn't exists and return it. + protected abstract ComplexCellBlock cellBlockForWritting(int row); + // The index in the block returned by cellBlock()/cellBlockFroWriting() where the row starts. + protected abstract int cellBlockBase(int row); + + protected abstract void swapCells(int i, int j); + protected abstract void mergeCells(int i, int j, int nowInSec); + protected abstract void moveCells(int i, int j); + + protected abstract long cellDataUnsharedHeapSizeExcludingData(); + protected abstract int dataCellSize(); + protected abstract void clearCellData(); + + // Swap row i and j + public void swap(int i, int j) + { + swapCells(i, j); + + int s = columns.complexColumnCount(); + for (int k = 0; k < s; k++) + complexDelTimes.swap(i * s + k, j * s + k); + } + + // Merge row i into j + public void merge(int i, int j, int nowInSec) + { + assert i > j; + + mergeCells(i, j, nowInSec); + + int s = columns.complexColumnCount(); + if (i * s >= complexDelTimes.size()) + return; + + for (int k = 0; k < s; k++) + if (complexDelTimes.supersedes(i * s + k, j * s + k)) + complexDelTimes.move(i * s + k, j * s + k); + } + + // Move row i into j + public void move(int i, int j) + { + moveCells(i, j); + ensureDelTimesCapacity(Math.max(i, j)); + int s = columns.complexColumnCount(); + for (int k = 0; k < s; k++) + complexDelTimes.move(i * s + k, j * s + k); + } + + public long unsharedHeapSizeExcludingData() + { + return cellDataUnsharedHeapSizeExcludingData() + complexDelTimes.unsharedHeapSize(); + } + + public int dataSize() + { + return dataCellSize() + complexDelTimes.dataSize(); + } + + public CellWriter cellWriter(boolean inOrderCells) + { + return new CellWriter(inOrderCells); + } + + public int complexDeletionIdx(int row, ColumnDefinition column) + { + int baseIdx = columns.complexIdx(column, 0); + if (baseIdx < 0) + return -1; + + int idx = (row * columns.complexColumnCount()) + baseIdx; + return idx < complexDelTimes.size() ? idx : -1; + } + + public boolean hasComplexDeletion(int row) + { + int base = row * columns.complexColumnCount(); + for (int i = base; i < base + columns.complexColumnCount(); i++) + if (!complexDelTimes.isLive(i)) + return true; + return false; + } + + public ByteBuffer getValue(int row, ColumnDefinition column, CellPath path) + { + CellData data = cellData(row); + assert data != null; + int idx = cellIdx(row, column, path); + return data.value(idx); + } + + public void setValue(int row, ColumnDefinition column, CellPath path, ByteBuffer value) + { + CellData data = cellData(row); + assert data != null; + int idx = cellIdx(row, column, path); + data.setValue(idx, value); + } + + public static ReusableIterator reusableComplexCells() + { + return new ReusableIterator(); + } + + public static DeletionTimeArray.Cursor complexDeletionCursor() + { + return new DeletionTimeArray.Cursor(); + } + + public static ReusableIterator reusableIterator() + { + return new ReusableIterator(); + } + + public void clear() + { + clearCellData(); + complexDelTimes.clear(); + } + + private void ensureDelTimesCapacity(int rowToSet) + { + int originalCapacity = complexDelTimes.size() / columns.complexColumnCount(); + if (rowToSet < originalCapacity) + return; + + int newCapacity = RowDataBlock.computeNewCapacity(originalCapacity, rowToSet); + complexDelTimes.resize(newCapacity * columns.complexColumnCount()); + } + + /** + * Simple sub-implementation that doesn't support swapping/sorting rows. + * The cell data for every row is stored in the same contiguous {@code ComplexCellBloc} + * object. + */ + private static class SimpleComplexRowDataBlock extends ComplexRowDataBlock + { + private static final long EMPTY_SIZE = ObjectSizes.measure(new SimpleComplexRowDataBlock(Columns.NONE, 0, false)); + + private final ComplexCellBlock cells; + + private SimpleComplexRowDataBlock(Columns columns, int rows, boolean isCounter) + { + super(columns, rows); + this.cells = new ComplexCellBlock(columns, rows, isCounter); + } + + protected ComplexCellBlock cellBlock(int row) + { + return cells; + } + + protected ComplexCellBlock cellBlockForWritting(int row) + { + cells.ensureCapacity(row); + return cells; + } + + protected int cellBlockBase(int row) + { + return 2 * row * columns().complexColumnCount(); + } + + // Swap cells from row i and j + public void swapCells(int i, int j) + { + throw new UnsupportedOperationException(); + } + + // Merge cells from row i into j + public void mergeCells(int i, int j, int nowInSec) + { + throw new UnsupportedOperationException(); + } + + // Move cells from row i into j + public void moveCells(int i, int j) + { + throw new UnsupportedOperationException(); + } + + protected long cellDataUnsharedHeapSizeExcludingData() + { + return EMPTY_SIZE + cells.unsharedHeapSizeExcludingData(); + } + + protected int dataCellSize() + { + return cells.dataSize(); + } + + protected void clearCellData() + { + cells.clear(); + } + } + + /** + * Sub-implementation that support swapping/sorting rows. + * The data for each row is stored in a different {@code ComplexCellBlock} object, + * making swapping rows easy. + */ + private static class SortableComplexRowDataBlock extends ComplexRowDataBlock + { + private static final long EMPTY_SIZE = ObjectSizes.measure(new SortableComplexRowDataBlock(Columns.NONE, 0, false)); + + // The cell data for each row. + private final List<ComplexCellBlock> cells; + private final boolean isCounter; + + private SortableComplexRowDataBlock(Columns columns, int rows, boolean isCounter) + { + super(columns, rows); + this.cells = new ArrayList<>(rows); + this.isCounter = isCounter; + } + + protected ComplexCellBlock cellBlockForWritting(int row) + { + if (row < cells.size()) + return cells.get(row); + + // Make sure the list of size 'row-1' before the insertion, adding nulls if necessary, + // so that we do are writing row 'row' + ensureCapacity(row-1); + + assert row == cells.size(); + ComplexCellBlock block = new ComplexCellBlock(columns(), 1, isCounter); + cells.add(block); + return block; + } + + private void ensureCapacity(int row) + { + while (row >= cells.size()) + cells.add(null); + } + + protected ComplexCellBlock cellBlock(int row) + { + return row >= cells.size() ? null : cells.get(row); + } + + protected int cellBlockBase(int row) + { + return 0; + } + + // Swap row i and j + protected void swapCells(int i, int j) + { + int max = Math.max(i, j); + if (max >= cells.size()) + ensureCapacity(max); + + ComplexCellBlock block = cells.get(j); + move(i, j); + cells.set(i, block); + } + + // Merge row i into j + protected void mergeCells(int i, int j, int nowInSec) + { + assert i > j; + if (i >= cells.size()) + return; + + ComplexCellBlock b1 = cells.get(i); + if (b1 == null) + return; // nothing to merge into j + + ComplexCellBlock b2 = cells.get(j); + if (b2 == null) + { + cells.set(j, b1); + return; + } + + ComplexCellBlock merged = new ComplexCellBlock(columns(), 1, isCounter); + + int idxMerged = 0; + int s = columns().complexColumnCount(); + for (int k = 0; k < s; k++) + { + ColumnDefinition column = columns().getComplex(k); + Comparator<CellPath> comparator = column.cellPathComparator(); + + merged.cellIdx[2 * k] = idxMerged; + + int idx1 = b1.cellIdx[2 * k]; + int end1 = b1.cellIdx[2 * k + 1]; + int idx2 = b2.cellIdx[2 * k]; + int end2 = b2.cellIdx[2 * k + 1]; + + while (idx1 < end1 || idx2 < end2) + { + int cmp = idx1 >= end1 ? 1 + : (idx2 >= end2 ? -1 + : comparator.compare(b1.complexPaths[idx1], b2.complexPaths[idx2])); + + if (cmp == 0) + merge(b1, idx1++, b2, idx2++, merged, idxMerged++, nowInSec); + else if (cmp < 0) + copy(b1, idx1++, merged, idxMerged++); + else + copy(b2, idx2++, merged, idxMerged++); + } + + merged.cellIdx[2 * k + 1] = idxMerged; + } + + cells.set(j, merged); + } + + private void copy(ComplexCellBlock fromBlock, int fromIdx, ComplexCellBlock toBlock, int toIdx) + { + fromBlock.data.moveCell(fromIdx, toBlock.data, toIdx); + toBlock.ensureComplexPathsCapacity(toIdx); + toBlock.complexPaths[toIdx] = fromBlock.complexPaths[fromIdx]; + } + + private void merge(ComplexCellBlock b1, int idx1, ComplexCellBlock b2, int idx2, ComplexCellBlock mergedBlock, int mergedIdx, int nowInSec) + { + if (isCounter) + CellData.mergeCounterCell(b1.data, idx1, b2.data, idx2, mergedBlock.data, mergedIdx, nowInSec); + else + CellData.mergeRegularCell(b1.data, idx1, b2.data, idx2, mergedBlock.data, mergedIdx, nowInSec); + mergedBlock.ensureComplexPathsCapacity(mergedIdx); + mergedBlock.complexPaths[mergedIdx] = b1.complexPaths[idx1]; + } + + // Move row i into j + protected void moveCells(int i, int j) + { + int max = Math.max(i, j); + if (max >= cells.size()) + ensureCapacity(max); + + cells.set(j, cells.get(i)); + } + + protected long cellDataUnsharedHeapSizeExcludingData() + { + long size = EMPTY_SIZE; + for (ComplexCellBlock block : cells) + if (block != null) + size += block.unsharedHeapSizeExcludingData(); + return size; + } + + protected int dataCellSize() + { + int size = 0; + for (ComplexCellBlock block : cells) + if (block != null) + size += block.dataSize(); + return size; + } + + protected void clearCellData() + { + for (ComplexCellBlock block : cells) + if (block != null) + block.clear(); + } + } + + /** + * Stores complex column cell data for one or more rows. + * <p> + * On top of a {@code CellData} object, this stores an index to where the cells + * of a given column start and stop in that {@code CellData} object (cellIdx) + * as well as the cell path for the cells (since {@code CellData} doesn't have those). + */ + private static class ComplexCellBlock + { + private final Columns columns; + + /* + * For a given complex column c, we have to store an unknown number of + * cells. So for each column of each row, we keep pointers (in data) + * to the start and end of the cells for this column (cells for a given + * columns are thus stored contiguously). + * For instance, if columns has 'c' complex columns, the x-th column of + * row 'n' will have it's cells in data at indexes + * [cellIdx[2 * (n * c + x)], cellIdx[2 * (n * c + x) + 1]) + */ + private int[] cellIdx; + + private final CellData data; + + // The first free idx in data (for writing purposes). + private int idx; + + // THe (complex) cells path. This is indexed exactly like the cells in data (so through cellIdx). + private CellPath[] complexPaths; + + public ComplexCellBlock(Columns columns, int rows, boolean isCounter) + { + this.columns = columns; + + int columnCount = columns.complexColumnCount(); + this.cellIdx = new int[columnCount * 2 * rows]; + + // We start with an estimated 4 cells per complex column. The arrays + // will grow if needed so this is just a somewhat random estimation. + int cellCount = columnCount * 4; + this.data = new CellData(cellCount, isCounter); + this.complexPaths = new CellPath[cellCount]; + } + + public void addCell(int columnIdx, ByteBuffer value, LivenessInfo info, CellPath path, boolean isFirstCell) + { + if (isFirstCell) + cellIdx[columnIdx] = idx; + cellIdx[columnIdx + 1] = idx + 1; + + data.setCell(idx, value, info); + ensureComplexPathsCapacity(idx); + complexPaths[idx] = path; + idx++; + } + + public long unsharedHeapSizeExcludingData() + { + long size = ObjectSizes.sizeOfArray(cellIdx) + + data.unsharedHeapSizeExcludingData() + + ObjectSizes.sizeOfArray(complexPaths); + + for (int i = 0; i < complexPaths.length; i++) + if (complexPaths[i] != null) + size += ((MemtableRowData.BufferCellPath)complexPaths[i]).unsharedHeapSizeExcludingData(); + return size; + } + + public int dataSize() + { + int size = data.dataSize() + cellIdx.length * 4; + + for (int i = 0; i < complexPaths.length; i++) + if (complexPaths[i] != null) + size += complexPaths[i].dataSize(); + + return size; + } + + private void ensureCapacity(int rowToSet) + { + int columnCount = columns.complexColumnCount(); + int originalCapacity = cellIdx.length / (2 * columnCount); + if (rowToSet < originalCapacity) + return; + + int newCapacity = RowDataBlock.computeNewCapacity(originalCapacity, rowToSet); + cellIdx = Arrays.copyOf(cellIdx, newCapacity * 2 * columnCount); + } + + private void ensureComplexPathsCapacity(int idxToSet) + { + int originalCapacity = complexPaths.length; + if (idxToSet < originalCapacity) + return; + + int newCapacity = RowDataBlock.computeNewCapacity(originalCapacity, idxToSet); + complexPaths = Arrays.copyOf(complexPaths, newCapacity); + } + + public void clear() + { + data.clear(); + Arrays.fill(cellIdx, 0); + Arrays.fill(complexPaths, null); + idx = 0; + } + } + + /** + * Simple sublcassing of {@code CellData.ReusableCell} to include the cell path. + */ + private static class ReusableCell extends CellData.ReusableCell + { + private ComplexCellBlock cellBlock; + + ReusableCell setTo(ComplexCellBlock cellBlock, ColumnDefinition column, int idx) + { + this.cellBlock = cellBlock; + super.setTo(cellBlock.data, column, idx); + return this; + } + + @Override + public CellPath path() + { + return cellBlock.complexPaths[idx]; + } + } + + /** + * An iterator over the complex cells of a given row. + * This is used both to iterate over all the (complex) cells of the row, or only on the cells + * of a given column within the row. + */ + static class ReusableIterator extends UnmodifiableIterator<Cell> + { + private ComplexCellBlock cellBlock; + private final ReusableCell cell = new ReusableCell(); + + // The idx in 'cellBlock' of the row we're iterating over + private int rowIdx; + + // columnIdx is the index in 'columns' of the current column we're iterating over. + // 'endColumnIdx' is the value of 'columnIdx' at which we should stop iterating. + private int columnIdx; + private int endColumnIdx; + + // idx is the index in 'cellBlock.data' of the current cell this iterator is on. 'endIdx' + // is the index in 'cellBlock.data' of the first cell that does not belong to the current + // column we're iterating over (the one pointed by columnIdx). + private int idx; + private int endIdx; + + private ReusableIterator() + { + } + + // Sets the iterator for iterating over the cells of 'column' in 'row' + public ReusableIterator setTo(ComplexRowDataBlock dataBlock, int row, ColumnDefinition column) + { + if (dataBlock == null) + { + this.cellBlock = null; + return null; + } + + this.cellBlock = dataBlock.cellBlock(row); + if (cellBlock == null) + return null; + + rowIdx = dataBlock.cellBlockBase(row); + + columnIdx = dataBlock.columns.complexIdx(column, 0); + if (columnIdx < 0) + return null; + + // We only want the cells of 'column', so stop as soon as we've reach the next column + endColumnIdx = columnIdx + 1; + + resetCellIdx(); + + return endIdx <= idx ? null : this; + } + + // Sets the iterator for iterating over all the cells of 'row' + public ReusableIterator setTo(ComplexRowDataBlock dataBlock, int row) + { + if (dataBlock == null) + { + this.cellBlock = null; + return null; + } + + this.cellBlock = dataBlock.cellBlock(row); + if (cellBlock == null) + return null; + + rowIdx = dataBlock.cellBlockBase(row); + + // We want to iterator over all columns + columnIdx = 0; + endColumnIdx = dataBlock.columns.complexColumnCount(); + + // Not every column might have cells, so set thing up so we're on the + // column having cells (with idx and endIdx sets properly for that column) + findNextColumnWithCells(); + return columnIdx < endColumnIdx ? null : this; + } + + private void findNextColumnWithCells() + { + while (columnIdx < endColumnIdx) + { + resetCellIdx(); + if (idx < endIdx) + return; + ++columnIdx; + } + } + + // Provided that columnIdx and rowIdx are properly set, sets idx to the first + // cells of the pointed column, and endIdx to the first cell not for said column + private void resetCellIdx() + { + int i = rowIdx + 2 * columnIdx; + if (i >= cellBlock.cellIdx.length) + { + idx = 0; + endIdx = 0; + } + else + { + idx = cellBlock.cellIdx[i]; + endIdx = cellBlock.cellIdx[i + 1]; + } + } + + public boolean hasNext() + { + if (cellBlock == null) + return false; + + if (columnIdx >= endColumnIdx) + return false; + + // checks if we have more cells for the current column + if (idx < endIdx) + return true; + + // otherwise, find the next column that has cells. + ++columnIdx; + findNextColumnWithCells(); + + return columnIdx < endColumnIdx; + } + + public Cell next() + { + return cell.setTo(cellBlock, cellBlock.columns.getComplex(columnIdx), idx++); + } + } + + public class CellWriter + { + private final boolean inOrderCells; + + private int base; + private int row; + private int lastColumnIdx; + + public CellWriter(boolean inOrderCells) + { + this.inOrderCells = inOrderCells; + } + + public void addCell(ColumnDefinition column, ByteBuffer value, LivenessInfo info, CellPath path) + { + assert path != null; + + ComplexCellBlock cellBlock = cellBlockForWritting(row); + + lastColumnIdx = columns.complexIdx(column, inOrderCells ? lastColumnIdx : 0); + assert lastColumnIdx >= 0 : "Cannot find column " + column.name + " in " + columns; + + int idx = cellBlockBase(row) + 2 * lastColumnIdx; + + int start = cellBlock.cellIdx[idx]; + int end = cellBlock.cellIdx[idx + 1]; + + cellBlock.addCell(idx, value, info, path, end <= start); + } + + public void setComplexDeletion(ColumnDefinition column, DeletionTime deletionTime) + { + int columnIdx = base + columns.complexIdx(column, 0); + ensureDelTimesCapacity(row); + complexDelTimes.set(columnIdx, deletionTime); + } + + public void endOfRow() + { + base += columns.complexColumnCount(); + lastColumnIdx = 0; + ++row; + } + + public void reset() + { + base = 0; + row = 0; + lastColumnIdx = 0; + clearCellData(); + complexDelTimes.clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/CounterCells.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/CounterCells.java b/src/java/org/apache/cassandra/db/rows/CounterCells.java new file mode 100644 index 0000000..732f195 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/CounterCells.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import org.apache.cassandra.db.context.CounterContext; + +public abstract class CounterCells +{ + private CounterCells() {} + + private static final CounterContext contextManager = CounterContext.instance(); + + public static boolean hasLegacyShards(Cell cell) + { + return contextManager.hasLegacyShards(cell.value()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/FilteringRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/FilteringRow.java b/src/java/org/apache/cassandra/db/rows/FilteringRow.java new file mode 100644 index 0000000..fb8f448 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/FilteringRow.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.util.Iterator; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; + +public abstract class FilteringRow extends WrappingRow +{ + public static FilteringRow columnsFilteringRow(final Columns toInclude) + { + return new FilteringRow() + { + @Override + protected boolean include(ColumnDefinition column) + { + return toInclude.contains(column); + } + }; + } + + public static FilteringRow columnsFilteringRow(final ColumnFilter toInclude) + { + return new FilteringRow() + { + @Override + protected boolean include(ColumnDefinition column) + { + return toInclude.includes(column); + } + + @Override + protected boolean include(Cell cell) + { + return toInclude.includes(cell); + } + }; + } + + public FilteringRow setTo(Row row) + { + super.setTo(row); + return this; + } + + /** + * The following functions are meant to be overriden based on needs. + */ + protected boolean include(Cell cell) { return true; } + protected boolean include(LivenessInfo info) { return true; } + protected boolean include(DeletionTime dt) { return true; } + protected boolean include(ColumnDefinition column) { return true; } + protected boolean include(ColumnDefinition c, DeletionTime dt) { return true; } + + // Sublcasses that override this should be careful to call the overriden version first, or this might break FilteringRow (i.e. it might not + // filter what it should). + @Override + protected Cell filterCell(Cell cell) + { + return include(cell.column()) && include(cell.livenessInfo()) && include(cell) ? cell : null; + } + + protected DeletionTime filterDeletionTime(DeletionTime deletion) + { + return deletion == null || !include(deletion) + ? DeletionTime.LIVE + : deletion; + } + + @Override + public LivenessInfo primaryKeyLivenessInfo() + { + LivenessInfo info = super.primaryKeyLivenessInfo(); + return include(info) ? info : LivenessInfo.NONE; + } + + @Override + public DeletionTime deletion() + { + DeletionTime deletion = super.deletion(); + return include(deletion) ? deletion : DeletionTime.LIVE; + } + + @Override + public Iterator<Cell> getCells(ColumnDefinition c) + { + // slightly speed things up if we know we don't care at all about the column + if (!include(c)) + return null; + + return super.getCells(c); + } + + @Override + public DeletionTime getDeletion(ColumnDefinition c) + { + if (!include(c)) + return DeletionTime.LIVE; + + DeletionTime dt = super.getDeletion(c); + return include(c, dt) ? dt : DeletionTime.LIVE; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/FilteringRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/FilteringRowIterator.java b/src/java/org/apache/cassandra/db/rows/FilteringRowIterator.java new file mode 100644 index 0000000..fd1c0a1 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/FilteringRowIterator.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import org.apache.cassandra.db.*; + +public class FilteringRowIterator extends WrappingUnfilteredRowIterator +{ + private final FilteringRow filter; + private Unfiltered next; + + public FilteringRowIterator(UnfilteredRowIterator toFilter) + { + super(toFilter); + this.filter = makeRowFilter(); + } + + // Subclasses that want to filter withing row should overwrite this. Note that since FilteringRow + // is a reusable object, this method won't be called for every filtered row and the same filter will + // be used for every regular rows. However, this still can be called twice if we have a static row + // to filter, because we don't want to use the same object for them as this makes for weird behavior + // if calls to staticRow() are interleaved with hasNext(). + protected FilteringRow makeRowFilter() + { + return null; + } + + protected boolean includeRangeTombstoneMarker(RangeTombstoneMarker marker) + { + return true; + } + + // Allows to modify the range tombstone returned. This is called *after* includeRangeTombstoneMarker has been called. + protected RangeTombstoneMarker filterRangeTombstoneMarker(RangeTombstoneMarker marker, boolean reversed) + { + return marker; + } + + protected boolean includeRow(Row row) + { + return true; + } + + protected boolean includePartitionDeletion(DeletionTime dt) + { + return true; + } + + @Override + public DeletionTime partitionLevelDeletion() + { + DeletionTime dt = wrapped.partitionLevelDeletion(); + return includePartitionDeletion(dt) ? dt : DeletionTime.LIVE; + } + + @Override + public Row staticRow() + { + Row row = super.staticRow(); + if (row == Rows.EMPTY_STATIC_ROW) + return row; + + FilteringRow filter = makeRowFilter(); + if (filter != null) + row = filter.setTo(row); + + return !row.isEmpty() && includeRow(row) ? row : Rows.EMPTY_STATIC_ROW; + } + + @Override + public boolean hasNext() + { + if (next != null) + return true; + + while (super.hasNext()) + { + Unfiltered unfiltered = super.next(); + if (unfiltered.kind() == Unfiltered.Kind.ROW) + { + Row row = filter == null ? (Row) unfiltered : filter.setTo((Row) unfiltered); + if (!row.isEmpty() && includeRow(row)) + { + next = row; + return true; + } + } + else + { + RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered; + if (includeRangeTombstoneMarker(marker)) + { + next = filterRangeTombstoneMarker(marker, isReverseOrder()); + return true; + } + } + } + return false; + } + + @Override + public Unfiltered next() + { + if (next == null) + hasNext(); + + Unfiltered toReturn = next; + next = null; + return toReturn; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java new file mode 100644 index 0000000..6241a89 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import com.google.common.collect.AbstractIterator; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; + +/** + * Abstract class to create UnfilteredRowIterator that lazily initialize themselves. + * + * This is used during partition range queries when we know the partition key but want + * to defer the initialization of the rest of the UnfilteredRowIterator until we need those informations. + * See {@link BigTableScanner#KeyScanningIterator} for instance. + */ +public abstract class LazilyInitializedUnfilteredRowIterator extends AbstractIterator<Unfiltered> implements UnfilteredRowIterator +{ + private final DecoratedKey partitionKey; + + private UnfilteredRowIterator iterator; + + public LazilyInitializedUnfilteredRowIterator(DecoratedKey partitionKey) + { + this.partitionKey = partitionKey; + } + + protected abstract UnfilteredRowIterator initializeIterator(); + + private void maybeInit() + { + if (iterator == null) + iterator = initializeIterator(); + } + + public CFMetaData metadata() + { + maybeInit(); + return iterator.metadata(); + } + + public PartitionColumns columns() + { + maybeInit(); + return iterator.columns(); + } + + public boolean isReverseOrder() + { + maybeInit(); + return iterator.isReverseOrder(); + } + + public DecoratedKey partitionKey() + { + return partitionKey; + } + + public DeletionTime partitionLevelDeletion() + { + maybeInit(); + return iterator.partitionLevelDeletion(); + } + + public Row staticRow() + { + maybeInit(); + return iterator.staticRow(); + } + + public RowStats stats() + { + maybeInit(); + return iterator.stats(); + } + + protected Unfiltered computeNext() + { + maybeInit(); + return iterator.hasNext() ? iterator.next() : endOfData(); + } + + public void close() + { + if (iterator != null) + iterator.close(); + } +}
