Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 d68325bfd -> b263af93a refs/heads/trunk e90ec26b6 -> ac3b1cc2c
Introduce unit tests for Rows, Cells, and DataResolver Fix Rows.diff complex deletion resolution patch by blake; reviewed by benedict for CASSANDRA-10266 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b263af93 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b263af93 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b263af93 Branch: refs/heads/cassandra-3.0 Commit: b263af93a2899cf12ee5f35f0518460683fdac18 Parents: d68325b Author: Blake Eggleston <[email protected]> Authored: Fri Sep 11 08:32:02 2015 -0700 Committer: Benedict Elliott Smith <[email protected]> Committed: Tue Sep 15 17:49:45 2015 +0100 ---------------------------------------------------------------------- src/java/org/apache/cassandra/db/rows/Rows.java | 32 +- test/unit/org/apache/cassandra/db/CellTest.java | 48 +- .../apache/cassandra/db/rows/RowBuilder.java | 85 +++ .../org/apache/cassandra/db/rows/RowsTest.java | 548 +++++++++++++++++++ .../cassandra/service/DataResolverTest.java | 225 +++++++- 5 files changed, 927 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b263af93/src/java/org/apache/cassandra/db/rows/Rows.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java index c3b4a92..ea2ca06 100644 --- a/src/java/org/apache/cassandra/db/rows/Rows.java +++ b/src/java/org/apache/cassandra/db/rows/Rows.java @@ -80,7 +80,7 @@ public abstract class Rows { ++columnCount; ++cellCount; - Cells.collectStats((Cell)cd, collector); + Cells.collectStats((Cell) cd, collector); } else { @@ -105,11 +105,13 @@ public abstract class Rows /** * Given the result ({@code merged}) of merging multiple {@code inputs}, signals the difference between * each input and {@code merged} to {@code diffListener}. + * <p> + * Note that this method doesn't only emit cells etc where there's a difference. The listener is informed + * of every corresponding entity between the merged and input rows, including those that are equal. * + * @param diffListener the listener to which to signal the differences between the inputs and the merged result. * @param merged the result of merging {@code inputs}. * @param inputs the inputs whose merge yielded {@code merged}. - * @param diffListener the listener to which to signal the differences between the inputs and the merged - * result. */ public static void diff(RowDiffListener diffListener, Row merged, Row...inputs) { @@ -179,6 +181,10 @@ public abstract class Rows } else { + + if (!mergedData.complexDeletion().isLive() || !inputData.complexDeletion().isLive()) + diffListener.onComplexDeletion(i, clustering, column, mergedData.complexDeletion(), inputData.complexDeletion()); + PeekingIterator<Cell> mergedCells = Iterators.peekingIterator(mergedData.iterator()); PeekingIterator<Cell> inputCells = Iterators.peekingIterator(inputData.iterator()); while (mergedCells.hasNext() && inputCells.hasNext()) @@ -221,8 +227,24 @@ public abstract class Rows return builder.build(); } - // Merge rows in memtable - // Return the minimum timestamp delta between existing and update + /** + * Merges two rows into the given builder, mainly for merging memtable rows. In addition to reconciling the cells + * in each row, the liveness info, and deletion times for the row and complex columns are also merged. + * <p> + * Note that this method assumes that the provided rows can meaningfully be reconciled together. That is, + * that the rows share the same clustering value, and belong to the same partition. + * + * @param existing + * @param update + * @param builder the row build to which the result of the reconciliation is written. + * @param nowInSec the current time in seconds (which plays a role during reconciliation + * because deleted cells always have precedence on timestamp equality and deciding if a + * cell is a live or not depends on the current time due to expiring cells). + * + * @return the smallest timestamp delta between corresponding rows from existing and update. A + * timestamp delta being computed as the difference between the cells and DeletionTimes from {@code existing} + * and those in {@code existing}. + */ public static long merge(Row existing, Row update, Row.Builder builder, http://git-wip-us.apache.org/repos/asf/cassandra/blob/b263af93/test/unit/org/apache/cassandra/db/CellTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CellTest.java b/test/unit/org/apache/cassandra/db/CellTest.java index e8cb1cb..5953255 100644 --- a/test/unit/org/apache/cassandra/db/CellTest.java +++ b/test/unit/org/apache/cassandra/db/CellTest.java @@ -19,6 +19,9 @@ package org.apache.cassandra.db; import java.nio.ByteBuffer; +import java.util.List; + +import com.google.common.collect.Lists; import junit.framework.Assert; import org.junit.BeforeClass; @@ -26,6 +29,9 @@ import org.junit.Test; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.db.marshal.IntegerType; +import org.apache.cassandra.db.marshal.MapType; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.SchemaLoader; @@ -37,16 +43,21 @@ public class CellTest { private static final String KEYSPACE1 = "CellTest"; private static final String CF_STANDARD1 = "Standard1"; + private static final String CF_COLLECTION = "Collection1"; - private CFMetaData cfm = SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1); + private static final CFMetaData cfm = SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1); + private static final CFMetaData cfm2 = CFMetaData.Builder.create(KEYSPACE1, CF_COLLECTION) + .addPartitionKey("k", IntegerType.instance) + .addClusteringColumn("c", IntegerType.instance) + .addRegularColumn("v", IntegerType.instance) + .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true)) + .build(); @BeforeClass public static void defineSchema() throws ConfigurationException { SchemaLoader.prepareServer(); - SchemaLoader.createKeyspace(KEYSPACE1, - KeyspaceParams.simple(1), - SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1)); + SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), cfm, cfm2); } @Test @@ -90,6 +101,35 @@ public class CellTest Assert.assertEquals(-1, testExpiring("val", "b", 2, 1, null, "a", null, 2)); } + private static ByteBuffer bb(int i) + { + return ByteBufferUtil.bytes(i); + } + + @Test + public void testComplexCellReconcile() + { + ColumnDefinition m = cfm2.getColumnDefinition(new ColumnIdentifier("m", false)); + int now1 = FBUtilities.nowInSeconds(); + long ts1 = now1*1000000; + + + Cell r1m1 = BufferCell.live(cfm2, m, ts1, bb(1), CellPath.create(bb(1))); + Cell r1m2 = BufferCell.live(cfm2, m, ts1, bb(2), CellPath.create(bb(2))); + List<Cell> cells1 = Lists.newArrayList(r1m1, r1m2); + + int now2 = now1 + 1; + long ts2 = now2*1000000; + Cell r2m2 = BufferCell.live(cfm2, m, ts2, bb(1), CellPath.create(bb(2))); + Cell r2m3 = BufferCell.live(cfm2, m, ts2, bb(2), CellPath.create(bb(3))); + Cell r2m4 = BufferCell.live(cfm2, m, ts2, bb(3), CellPath.create(bb(4))); + List<Cell> cells2 = Lists.newArrayList(r2m2, r2m3, r2m4); + + RowBuilder builder = new RowBuilder(); + Cells.reconcileComplex(m, cells1.iterator(), cells2.iterator(), DeletionTime.LIVE, builder, now2 + 1); + Assert.assertEquals(Lists.newArrayList(r1m1, r2m2, r2m3, r2m4), builder.cells); + } + private int testExpiring(String n1, String v1, long t1, int et1, String n2, String v2, Long t2, Integer et2) { if (n2 == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b263af93/test/unit/org/apache/cassandra/db/rows/RowBuilder.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/rows/RowBuilder.java b/test/unit/org/apache/cassandra/db/rows/RowBuilder.java new file mode 100644 index 0000000..caa5c40 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/rows/RowBuilder.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.rows; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.LivenessInfo; +import org.apache.cassandra.utils.Pair; + +/** + * Instrumented Builder implementation for testing the + * behavior of Cells and Rows static methods + */ +public class RowBuilder implements Row.Builder +{ + public List<Cell> cells = new LinkedList<>(); + public Clustering clustering = null; + public LivenessInfo livenessInfo = null; + public DeletionTime deletionTime = null; + public List<Pair<ColumnDefinition, DeletionTime>> complexDeletions = new LinkedList<>(); + + public void addCell(Cell cell) + { + cells.add(cell); + } + + public boolean isSorted() + { + throw new UnsupportedOperationException(); + } + + public void newRow(Clustering clustering) + { + assert this.clustering == null; + this.clustering = clustering; + } + + public Clustering clustering() + { + return clustering; + } + + public void addPrimaryKeyLivenessInfo(LivenessInfo info) + { + assert livenessInfo == null; + livenessInfo = info; + } + + public void addRowDeletion(DeletionTime deletion) + { + assert deletionTime == null; + deletionTime = deletion; + } + + + public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion) + { + complexDeletions.add(Pair.create(column, complexDeletion)); + } + + public Row build() + { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b263af93/test/unit/org/apache/cassandra/db/rows/RowsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/rows/RowsTest.java b/test/unit/org/apache/cassandra/db/rows/RowsTest.java new file mode 100644 index 0000000..306d687 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/rows/RowsTest.java @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.rows; + +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.LivenessInfo; +import org.apache.cassandra.db.marshal.IntegerType; +import org.apache.cassandra.db.marshal.MapType; +import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; + +public class RowsTest +{ + private static final String KEYSPACE = "rows_test"; + private static final String KCVM_TABLE = "kcvm"; + private static final CFMetaData kcvm; + private static final ColumnDefinition v; + private static final ColumnDefinition m; + private static final Clustering c1; + + static + { + kcvm = CFMetaData.Builder.create(KEYSPACE, KCVM_TABLE) + .addPartitionKey("k", IntegerType.instance) + .addClusteringColumn("c", IntegerType.instance) + .addRegularColumn("v", IntegerType.instance) + .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true)) + .build(); + + v = kcvm.getColumnDefinition(new ColumnIdentifier("v", false)); + m = kcvm.getColumnDefinition(new ColumnIdentifier("m", false)); + c1 = kcvm.comparator.make(BigInteger.valueOf(1)); + } + + private static final ByteBuffer BB1 = ByteBufferUtil.bytes(1); + private static final ByteBuffer BB2 = ByteBufferUtil.bytes(2); + private static final ByteBuffer BB3 = ByteBufferUtil.bytes(3); + private static final ByteBuffer BB4 = ByteBufferUtil.bytes(4); + + private static class MergedPair<T> + { + public final int idx; + public final T merged; + public final T original; + + private MergedPair(int idx, T merged, T original) + { + this.idx = idx; + this.merged = merged; + this.original = original; + } + + static <T> MergedPair<T> create(int i, T m, T o) + { + return new MergedPair<>(i, m, o); + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + MergedPair<?> that = (MergedPair<?>) o; + + if (idx != that.idx) return false; + if (merged != null ? !merged.equals(that.merged) : that.merged != null) return false; + return !(original != null ? !original.equals(that.original) : that.original != null); + } + + public int hashCode() + { + int result = idx; + result = 31 * result + (merged != null ? merged.hashCode() : 0); + result = 31 * result + (original != null ? original.hashCode() : 0); + return result; + } + + public String toString() + { + return "MergedPair{" + + "idx=" + idx + + ", merged=" + merged + + ", original=" + original + + '}'; + } + } + + private static class DiffListener implements RowDiffListener + { + int updates = 0; + Clustering clustering = null; + + private void updateClustering(Clustering c) + { + assert clustering == null || clustering == c; + clustering = c; + } + + List<MergedPair<Cell>> cells = new LinkedList<>(); + public void onCell(int i, Clustering clustering, Cell merged, Cell original) + { + updateClustering(clustering); + cells.add(MergedPair.create(i, merged, original)); + updates++; + } + + List<MergedPair<LivenessInfo>> liveness = new LinkedList<>(); + public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original) + { + updateClustering(clustering); + liveness.add(MergedPair.create(i, merged, original)); + updates++; + } + + List<MergedPair<DeletionTime>> deletions = new LinkedList<>(); + public void onDeletion(int i, Clustering clustering, DeletionTime merged, DeletionTime original) + { + updateClustering(clustering); + deletions.add(MergedPair.create(i, merged, original)); + updates++; + } + + Map<ColumnDefinition, List<MergedPair<DeletionTime>>> complexDeletions = new HashMap<>(); + public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original) + { + updateClustering(clustering); + if (!complexDeletions.containsKey(column)) complexDeletions.put(column, new LinkedList<>()); + complexDeletions.get(column).add(MergedPair.create(i, merged, original)); + updates++; + } + } + + public static class StatsCollector implements PartitionStatisticsCollector + { + List<Cell> cells = new LinkedList<>(); + public void update(Cell cell) + { + cells.add(cell); + } + + List<LivenessInfo> liveness = new LinkedList<>(); + public void update(LivenessInfo info) + { + liveness.add(info); + } + + List<DeletionTime> deletions = new LinkedList<>(); + public void update(DeletionTime deletion) + { + deletions.add(deletion); + } + + long columnCount = -1; + public void updateColumnSetPerRow(long columnSetInRow) + { + assert columnCount < 0; + this.columnCount = columnSetInRow; + } + + boolean hasLegacyCounterShards = false; + public void updateHasLegacyCounterShards(boolean hasLegacyCounterShards) + { + this.hasLegacyCounterShards |= hasLegacyCounterShards; + } + } + + private static long secondToTs(int now) + { + return now * 1000000; + } + + private static Row.Builder createBuilder(Clustering c, int now, ByteBuffer vVal, ByteBuffer mKey, ByteBuffer mVal) + { + long ts = secondToTs(now); + Row.Builder builder = BTreeRow.unsortedBuilder(now); + builder.newRow(c); + builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(kcvm, ts, now)); + if (vVal != null) + { + builder.addCell(BufferCell.live(kcvm, v, ts, vVal)); + } + if (mKey != null && mVal != null) + { + builder.addComplexDeletion(m, new DeletionTime(ts - 1, now)); + builder.addCell(BufferCell.live(kcvm, m, ts, mVal, CellPath.create(mKey))); + } + + return builder; + } + + @Test + public void copy() + { + int now = FBUtilities.nowInSeconds(); + long ts = secondToTs(now); + Row.Builder originalBuilder = BTreeRow.unsortedBuilder(now); + originalBuilder.newRow(c1); + LivenessInfo liveness = LivenessInfo.create(kcvm, ts, now); + originalBuilder.addPrimaryKeyLivenessInfo(liveness); + DeletionTime complexDeletion = new DeletionTime(ts-1, now); + originalBuilder.addComplexDeletion(m, complexDeletion); + List<Cell> expectedCells = Lists.newArrayList(BufferCell.live(kcvm, v, secondToTs(now), BB1), + BufferCell.live(kcvm, m, secondToTs(now), BB1, CellPath.create(BB1)), + BufferCell.live(kcvm, m, secondToTs(now), BB2, CellPath.create(BB2))); + expectedCells.forEach(originalBuilder::addCell); + DeletionTime rowDeletion = new DeletionTime(ts, now); + originalBuilder.addRowDeletion(rowDeletion); + + RowBuilder builder = new RowBuilder(); + Rows.copy(originalBuilder.build(), builder); + + Assert.assertEquals(c1, builder.clustering); + Assert.assertEquals(liveness, builder.livenessInfo); + Assert.assertEquals(rowDeletion, builder.deletionTime); + Assert.assertEquals(Lists.newArrayList(Pair.create(m, complexDeletion)), builder.complexDeletions); + Assert.assertEquals(Sets.newHashSet(expectedCells), Sets.newHashSet(builder.cells)); + } + + @Test + public void collectStats() + { + int now = FBUtilities.nowInSeconds(); + long ts = secondToTs(now); + Row.Builder builder = BTreeRow.unsortedBuilder(now); + builder.newRow(c1); + LivenessInfo liveness = LivenessInfo.create(kcvm, ts, now); + builder.addPrimaryKeyLivenessInfo(liveness); + DeletionTime complexDeletion = new DeletionTime(ts-1, now); + builder.addComplexDeletion(m, complexDeletion); + List<Cell> expectedCells = Lists.newArrayList(BufferCell.live(kcvm, v, ts, BB1), + BufferCell.live(kcvm, m, ts, BB1, CellPath.create(BB1)), + BufferCell.live(kcvm, m, ts, BB2, CellPath.create(BB2))); + expectedCells.forEach(builder::addCell); + DeletionTime rowDeletion = new DeletionTime(ts, now); + builder.addRowDeletion(rowDeletion); + + StatsCollector collector = new StatsCollector(); + Rows.collectStats(builder.build(), collector); + + Assert.assertEquals(Lists.newArrayList(liveness), collector.liveness); + Assert.assertEquals(Sets.newHashSet(rowDeletion, complexDeletion), Sets.newHashSet(collector.deletions)); + Assert.assertEquals(Sets.newHashSet(expectedCells), Sets.newHashSet(collector.cells)); + Assert.assertEquals(2, collector.columnCount); + Assert.assertFalse(collector.hasLegacyCounterShards); + } + + + public static void addExpectedCells(Set<MergedPair<Cell>> dst, Cell merged, Cell... inputs) + { + for (int i=0; i<inputs.length; i++) + { + dst.add(MergedPair.create(i, merged, inputs[i])); + } + } + + @Test + public void diff() + { + int now1 = FBUtilities.nowInSeconds(); + long ts1 = secondToTs(now1); + Row.Builder r1Builder = BTreeRow.unsortedBuilder(now1); + r1Builder.newRow(c1); + LivenessInfo r1Liveness = LivenessInfo.create(kcvm, ts1, now1); + r1Builder.addPrimaryKeyLivenessInfo(r1Liveness); + DeletionTime r1ComplexDeletion = new DeletionTime(ts1-1, now1); + r1Builder.addComplexDeletion(m, r1ComplexDeletion); + + Cell r1v = BufferCell.live(kcvm, v, ts1, BB1); + Cell r1m1 = BufferCell.live(kcvm, m, ts1, BB1, CellPath.create(BB1)); + Cell r1m2 = BufferCell.live(kcvm, m, ts1, BB2, CellPath.create(BB2)); + List<Cell> r1ExpectedCells = Lists.newArrayList(r1v, r1m1, r1m2); + + r1ExpectedCells.forEach(r1Builder::addCell); + + int now2 = now1 + 1; + long ts2 = secondToTs(now2); + Row.Builder r2Builder = BTreeRow.unsortedBuilder(now2); + r2Builder.newRow(c1); + LivenessInfo r2Liveness = LivenessInfo.create(kcvm, ts2, now2); + r2Builder.addPrimaryKeyLivenessInfo(r2Liveness); + Cell r2v = BufferCell.live(kcvm, v, ts2, BB2); + Cell r2m2 = BufferCell.live(kcvm, m, ts2, BB1, CellPath.create(BB2)); + Cell r2m3 = BufferCell.live(kcvm, m, ts2, BB2, CellPath.create(BB3)); + Cell r2m4 = BufferCell.live(kcvm, m, ts2, BB3, CellPath.create(BB4)); + List<Cell> r2ExpectedCells = Lists.newArrayList(r2v, r2m2, r2m3, r2m4); + + r2ExpectedCells.forEach(r2Builder::addCell); + DeletionTime r2RowDeletion = new DeletionTime(ts1 - 2, now2); + r2Builder.addRowDeletion(r2RowDeletion); + + Row r1 = r1Builder.build(); + Row r2 = r2Builder.build(); + Row merged = Rows.merge(r1, r2, now2 + 1); + + Assert.assertEquals(r1ComplexDeletion, merged.getComplexColumnData(m).complexDeletion()); + + DiffListener listener = new DiffListener(); + Rows.diff(listener, merged, r1, r2); + + Assert.assertEquals(c1, listener.clustering); + + // check cells + Set<MergedPair<Cell>> expectedCells = Sets.newHashSet(); + addExpectedCells(expectedCells, r2v, r1v, r2v); // v + addExpectedCells(expectedCells, r1m1, r1m1, null); // m[1] + addExpectedCells(expectedCells, r2m2, r1m2, r2m2); // m[2] + addExpectedCells(expectedCells, r2m3, null, r2m3); // m[3] + addExpectedCells(expectedCells, r2m4, null, r2m4); // m[4] + + Assert.assertEquals(expectedCells.size(), listener.cells.size()); + Assert.assertEquals(expectedCells, Sets.newHashSet(listener.cells)); + + // liveness + List<MergedPair<LivenessInfo>> expectedLiveness = Lists.newArrayList(MergedPair.create(0, r2Liveness, r1Liveness), + MergedPair.create(1, r2Liveness, r2Liveness)); + Assert.assertEquals(expectedLiveness, listener.liveness); + + // deletions + List<MergedPair<DeletionTime>> expectedDeletions = Lists.newArrayList(MergedPair.create(0, r2RowDeletion, null), + MergedPair.create(1, r2RowDeletion, r2RowDeletion)); + Assert.assertEquals(expectedDeletions, listener.deletions); + + // complex deletions + List<MergedPair<DeletionTime>> expectedCmplxDeletions = Lists.newArrayList(MergedPair.create(0, r1ComplexDeletion, r1ComplexDeletion), + MergedPair.create(1, r1ComplexDeletion, DeletionTime.LIVE)); + Assert.assertEquals(ImmutableMap.builder().put(m, expectedCmplxDeletions).build(), listener.complexDeletions); + } + + /** + * merged row has no column data + */ + @Test + public void diffEmptyMerged() + { + int now1 = FBUtilities.nowInSeconds(); + long ts1 = secondToTs(now1); + Row.Builder r1Builder = BTreeRow.unsortedBuilder(now1); + r1Builder.newRow(c1); + LivenessInfo r1Liveness = LivenessInfo.create(kcvm, ts1, now1); + r1Builder.addPrimaryKeyLivenessInfo(r1Liveness); + + // mergedData == null + int now2 = now1 + 1; + long ts2 = secondToTs(now2); + Row.Builder r2Builder = BTreeRow.unsortedBuilder(now2); + r2Builder.newRow(c1); + LivenessInfo r2Liveness = LivenessInfo.create(kcvm, ts2, now2); + r2Builder.addPrimaryKeyLivenessInfo(r2Liveness); + DeletionTime r2ComplexDeletion = new DeletionTime(ts2-1, now2); + r2Builder.addComplexDeletion(m, r2ComplexDeletion); + Cell r2v = BufferCell.live(kcvm, v, ts2, BB2); + Cell r2m2 = BufferCell.live(kcvm, m, ts2, BB1, CellPath.create(BB2)); + Cell r2m3 = BufferCell.live(kcvm, m, ts2, BB2, CellPath.create(BB3)); + Cell r2m4 = BufferCell.live(kcvm, m, ts2, BB3, CellPath.create(BB4)); + List<Cell> r2ExpectedCells = Lists.newArrayList(r2v, r2m2, r2m3, r2m4); + + r2ExpectedCells.forEach(r2Builder::addCell); + DeletionTime r2RowDeletion = new DeletionTime(ts1 - 1, now2); + r2Builder.addRowDeletion(r2RowDeletion); + + Row r1 = r1Builder.build(); + Row r2 = r2Builder.build(); + + DiffListener listener = new DiffListener(); + Rows.diff(listener, r1, r2); + + Assert.assertEquals(c1, listener.clustering); + + // check cells + Set<MergedPair<Cell>> expectedCells = Sets.newHashSet(MergedPair.create(0, null, r2v), // v + MergedPair.create(0, null, r2m2), // m[2] + MergedPair.create(0, null, r2m3), // m[3] + MergedPair.create(0, null, r2m4)); // m[4] + + Assert.assertEquals(expectedCells.size(), listener.cells.size()); + Assert.assertEquals(expectedCells, Sets.newHashSet(listener.cells)); + + // complex deletions + List<MergedPair<DeletionTime>> expectedCmplxDeletions = Lists.newArrayList(MergedPair.create(0, null, r2ComplexDeletion)); + Assert.assertEquals(ImmutableMap.builder().put(m, expectedCmplxDeletions).build(), listener.complexDeletions); + } + + /** + * input row has no column data + */ + @Test + public void diffEmptyInput() + { + int now1 = FBUtilities.nowInSeconds(); + long ts1 = secondToTs(now1); + Row.Builder r1Builder = BTreeRow.unsortedBuilder(now1); + r1Builder.newRow(c1); + LivenessInfo r1Liveness = LivenessInfo.create(kcvm, ts1, now1); + r1Builder.addPrimaryKeyLivenessInfo(r1Liveness); + + // mergedData == null + int now2 = now1 + 1; + long ts2 = secondToTs(now2); + Row.Builder r2Builder = BTreeRow.unsortedBuilder(now2); + r2Builder.newRow(c1); + LivenessInfo r2Liveness = LivenessInfo.create(kcvm, ts2, now2); + r2Builder.addPrimaryKeyLivenessInfo(r2Liveness); + DeletionTime r2ComplexDeletion = new DeletionTime(ts2-1, now2); + r2Builder.addComplexDeletion(m, r2ComplexDeletion); + Cell r2v = BufferCell.live(kcvm, v, ts2, BB2); + Cell r2m2 = BufferCell.live(kcvm, m, ts2, BB1, CellPath.create(BB2)); + Cell r2m3 = BufferCell.live(kcvm, m, ts2, BB2, CellPath.create(BB3)); + Cell r2m4 = BufferCell.live(kcvm, m, ts2, BB3, CellPath.create(BB4)); + List<Cell> r2ExpectedCells = Lists.newArrayList(r2v, r2m2, r2m3, r2m4); + + r2ExpectedCells.forEach(r2Builder::addCell); + DeletionTime r2RowDeletion = new DeletionTime(ts1 - 1, now2); + r2Builder.addRowDeletion(r2RowDeletion); + + Row r1 = r1Builder.build(); + Row r2 = r2Builder.build(); + + DiffListener listener = new DiffListener(); + Rows.diff(listener, r2, r1); + + Assert.assertEquals(c1, listener.clustering); + + // check cells + Set<MergedPair<Cell>> expectedCells = Sets.newHashSet(MergedPair.create(0, r2v, null), // v + MergedPair.create(0, r2m2, null), // m[2] + MergedPair.create(0, r2m3, null), // m[3] + MergedPair.create(0, r2m4, null)); // m[4] + + Assert.assertEquals(expectedCells.size(), listener.cells.size()); + Assert.assertEquals(expectedCells, Sets.newHashSet(listener.cells)); + + // complex deletions + List<MergedPair<DeletionTime>> expectedCmplxDeletions = Lists.newArrayList(MergedPair.create(0, r2ComplexDeletion, null)); + Assert.assertEquals(ImmutableMap.builder().put(m, expectedCmplxDeletions).build(), listener.complexDeletions); + } + + @Test + public void merge() + { + int now1 = FBUtilities.nowInSeconds(); + Row.Builder existingBuilder = createBuilder(c1, now1, BB1, BB1, BB1); + + int now2 = now1 + 1; + long ts2 = secondToTs(now2); + + Cell expectedVCell = BufferCell.live(kcvm, v, ts2, BB2); + Cell expectedMCell = BufferCell.live(kcvm, m, ts2, BB2, CellPath.create(BB1)); + DeletionTime expectedComplexDeletionTime = new DeletionTime(ts2 - 1, now2); + + Row.Builder updateBuilder = createBuilder(c1, now2, null, null, null); + updateBuilder.addCell(expectedVCell); + updateBuilder.addComplexDeletion(m, expectedComplexDeletionTime); + updateBuilder.addCell(expectedMCell); + + RowBuilder builder = new RowBuilder(); + long td = Rows.merge(existingBuilder.build(), updateBuilder.build(), builder, now2 + 1); + + Assert.assertEquals(c1, builder.clustering); + Assert.assertEquals(LivenessInfo.create(kcvm, ts2, now2), builder.livenessInfo); + Assert.assertEquals(Lists.newArrayList(Pair.create(m, new DeletionTime(ts2-1, now2))), builder.complexDeletions); + + Assert.assertEquals(2, builder.cells.size()); + Assert.assertEquals(Lists.newArrayList(expectedVCell, expectedMCell), Lists.newArrayList(builder.cells)); + Assert.assertEquals(ts2 - secondToTs(now1), td); + } + + @Test + public void mergeComplexDeletionSupersededByRowDeletion() + { + int now1 = FBUtilities.nowInSeconds(); + Row.Builder existingBuilder = createBuilder(c1, now1, null, null, null); + + int now2 = now1 + 1; + Row.Builder updateBuilder = createBuilder(c1, now2, null, BB1, BB1); + int now3 = now2 + 1; + DeletionTime expectedDeletion = new DeletionTime(secondToTs(now3), now3); + updateBuilder.addRowDeletion(expectedDeletion); + + RowBuilder builder = new RowBuilder(); + Rows.merge(existingBuilder.build(), updateBuilder.build(), builder, now3 + 1); + + Assert.assertEquals(expectedDeletion, builder.deletionTime); + Assert.assertEquals(Collections.emptyList(), builder.complexDeletions); + Assert.assertEquals(Collections.emptyList(), builder.cells); + } + + /** + * If a row's deletion time deletes a row's liveness info, the new row should have it's + * liveness info set to empty + */ + @Test + public void mergeRowDeletionSupercedesLiveness() + { + int now1 = FBUtilities.nowInSeconds(); + Row.Builder existingBuilder = createBuilder(c1, now1, null, null, null); + + int now2 = now1 + 1; + Row.Builder updateBuilder = createBuilder(c1, now2, BB1, BB1, BB1); + int now3 = now2 + 1; + DeletionTime expectedDeletion = new DeletionTime(secondToTs(now3), now3); + updateBuilder.addRowDeletion(expectedDeletion); + + RowBuilder builder = new RowBuilder(); + Rows.merge(existingBuilder.build(), updateBuilder.build(), builder, now3 + 1); + + Assert.assertEquals(expectedDeletion, builder.deletionTime); + Assert.assertEquals(LivenessInfo.EMPTY, builder.livenessInfo); + Assert.assertEquals(Collections.emptyList(), builder.complexDeletions); + Assert.assertEquals(Collections.emptyList(), builder.cells); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b263af93/test/unit/org/apache/cassandra/service/DataResolverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java index 0804bfb..b60a039 100644 --- a/test/unit/org/apache/cassandra/service/DataResolverTest.java +++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java @@ -20,15 +20,22 @@ package org.apache.cassandra.service; import java.net.InetAddress; import java.net.UnknownHostException; +import java.nio.ByteBuffer; import java.util.*; import com.google.common.collect.Iterators; +import com.google.common.collect.Sets; import org.junit.*; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.ByteType; +import org.apache.cassandra.db.marshal.IntegerType; +import org.apache.cassandra.db.marshal.MapType; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BytesType; @@ -36,6 +43,7 @@ import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.net.*; import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.Util.assertClustering; @@ -50,6 +58,7 @@ public class DataResolverTest { public static final String KEYSPACE1 = "DataResolverTest"; public static final String CF_STANDARD = "Standard1"; + public static final String CF_COLLECTION = "Collection1"; // counter to generate the last byte of the respondent's address in a ReadResponse message private int addressSuffix = 10; @@ -57,7 +66,10 @@ public class DataResolverTest private DecoratedKey dk; private Keyspace ks; private ColumnFamilyStore cfs; + private ColumnFamilyStore cfs2; private CFMetaData cfm; + private CFMetaData cfm2; + private ColumnDefinition m; private int nowInSec; private ReadCommand command; private MessageRecorder messageRecorder; @@ -74,10 +86,15 @@ public class DataResolverTest .addRegularColumn("one", AsciiType.instance) .addRegularColumn("two", AsciiType.instance) .build(); + + CFMetaData cfMetaData2 = CFMetaData.Builder.create(KEYSPACE1, CF_COLLECTION) + .addPartitionKey("k", ByteType.instance) + .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true)) + .build(); SchemaLoader.prepareServer(); SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), - cfMetadata); + cfMetadata, cfMetaData2); } @Before @@ -87,6 +104,10 @@ public class DataResolverTest ks = Keyspace.open(KEYSPACE1); cfs = ks.getColumnFamilyStore(CF_STANDARD); cfm = cfs.metadata; + cfs2 = ks.getColumnFamilyStore(CF_COLLECTION); + cfm2 = cfs2.metadata; + m = cfm2.getColumnDefinition(new ColumnIdentifier("m", false)); + nowInSec = FBUtilities.nowInSeconds(); command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build(); } @@ -419,6 +440,200 @@ public class DataResolverTest assertRepairContainsColumn(msg, "1", "two", "B", 3); } + private static ByteBuffer bb(int b) + { + return ByteBufferUtil.bytes(b); + } + + private Cell mapCell(int k, int v, long ts) + { + return BufferCell.live(cfm2, m, ts, bb(v), CellPath.create(bb(k))); + } + + @Test + public void testResolveComplexDelete() + { + ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build(); + DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2); + + long[] ts = {100, 200}; + + Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec); + builder.newRow(Clustering.EMPTY); + builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec)); + builder.addCell(mapCell(0, 0, ts[0])); + + InetAddress peer1 = peer(); + resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd)); + + builder.newRow(Clustering.EMPTY); + DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec); + builder.addComplexDeletion(m, expectedCmplxDelete); + Cell expectedCell = mapCell(1, 1, ts[1]); + builder.addCell(expectedCell); + + InetAddress peer2 = peer(); + resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd)); + + try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data)) + { + Row row = Iterators.getOnlyElement(rows); + assertColumns(row, "m"); + Assert.assertNull(row.getCell(m, CellPath.create(bb(0)))); + Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1)))); + } + + MessageOut<Mutation> msg; + msg = getSentMessage(peer1); + Iterator<Row> rowIter = msg.payload.getPartitionUpdate(cfm2.cfId).iterator(); + assertTrue(rowIter.hasNext()); + Row row = rowIter.next(); + assertFalse(rowIter.hasNext()); + + ComplexColumnData cd = row.getComplexColumnData(m); + + assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd)); + assertEquals(expectedCmplxDelete, cd.complexDeletion()); + + Assert.assertNull(messageRecorder.sent.get(peer2)); + } + + @Test + public void testResolveDeletedCollection() + { + + ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build(); + DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2); + + long[] ts = {100, 200}; + + Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec); + builder.newRow(Clustering.EMPTY); + builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec)); + builder.addCell(mapCell(0, 0, ts[0])); + + InetAddress peer1 = peer(); + resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd)); + + builder.newRow(Clustering.EMPTY); + DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec); + builder.addComplexDeletion(m, expectedCmplxDelete); + + InetAddress peer2 = peer(); + resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd)); + + try(PartitionIterator data = resolver.resolve()) + { + assertFalse(data.hasNext()); + } + + MessageOut<Mutation> msg; + msg = getSentMessage(peer1); + Iterator<Row> rowIter = msg.payload.getPartitionUpdate(cfm2.cfId).iterator(); + assertTrue(rowIter.hasNext()); + Row row = rowIter.next(); + assertFalse(rowIter.hasNext()); + + ComplexColumnData cd = row.getComplexColumnData(m); + + assertEquals(Collections.emptySet(), Sets.newHashSet(cd)); + assertEquals(expectedCmplxDelete, cd.complexDeletion()); + + Assert.assertNull(messageRecorder.sent.get(peer2)); + } + + @Test + public void testResolveNewCollection() + { + ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build(); + DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2); + + long[] ts = {100, 200}; + + // map column + Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec); + builder.newRow(Clustering.EMPTY); + DeletionTime expectedCmplxDelete = new DeletionTime(ts[0] - 1, nowInSec); + builder.addComplexDeletion(m, expectedCmplxDelete); + Cell expectedCell = mapCell(0, 0, ts[0]); + builder.addCell(expectedCell); + + // empty map column + InetAddress peer1 = peer(); + resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd)); + + InetAddress peer2 = peer(); + resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk)))); + + try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data)) + { + Row row = Iterators.getOnlyElement(rows); + assertColumns(row, "m"); + ComplexColumnData cd = row.getComplexColumnData(m); + assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd)); + } + + Assert.assertNull(messageRecorder.sent.get(peer1)); + + MessageOut<Mutation> msg; + msg = getSentMessage(peer2); + Iterator<Row> rowIter = msg.payload.getPartitionUpdate(cfm2.cfId).iterator(); + assertTrue(rowIter.hasNext()); + Row row = rowIter.next(); + assertFalse(rowIter.hasNext()); + + ComplexColumnData cd = row.getComplexColumnData(m); + + assertEquals(Sets.newHashSet(expectedCell), Sets.newHashSet(cd)); + assertEquals(expectedCmplxDelete, cd.complexDeletion()); + } + + @Test + public void testResolveNewCollectionOverwritingDeleted() + { + ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build(); + DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2); + + long[] ts = {100, 200}; + + // cleared map column + Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec); + builder.newRow(Clustering.EMPTY); + builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec)); + + InetAddress peer1 = peer(); + resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd)); + + // newer, overwritten map column + builder.newRow(Clustering.EMPTY); + DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec); + builder.addComplexDeletion(m, expectedCmplxDelete); + Cell expectedCell = mapCell(1, 1, ts[1]); + builder.addCell(expectedCell); + + InetAddress peer2 = peer(); + resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd)); + + try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data)) + { + Row row = Iterators.getOnlyElement(rows); + assertColumns(row, "m"); + ComplexColumnData cd = row.getComplexColumnData(m); + assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd)); + } + + MessageOut<Mutation> msg; + msg = getSentMessage(peer1); + Row row = Iterators.getOnlyElement(msg.payload.getPartitionUpdate(cfm2.cfId).iterator()); + + ComplexColumnData cd = row.getComplexColumnData(m); + + assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd)); + assertEquals(expectedCmplxDelete, cd.complexDeletion()); + + Assert.assertNull(messageRecorder.sent.get(peer2)); + } + private InetAddress peer() { try @@ -488,10 +703,16 @@ public class DataResolverTest assertEquals(update.metadata().cfName, cfm.cfName); } + public MessageIn<ReadResponse> readResponseMessage(InetAddress from, UnfilteredPartitionIterator partitionIterator) { + return readResponseMessage(from, partitionIterator, command); + + } + public MessageIn<ReadResponse> readResponseMessage(InetAddress from, UnfilteredPartitionIterator partitionIterator, ReadCommand cmd) + { return MessageIn.create(from, - ReadResponse.createRemoteDataResponse(partitionIterator, command.columnFilter()), + ReadResponse.createRemoteDataResponse(partitionIterator, cmd.columnFilter()), Collections.EMPTY_MAP, MessagingService.Verb.REQUEST_RESPONSE, MessagingService.current_version);
