Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 c3bc85641 -> 9658ee9fa
Fix some tests failure for CASSANDRA-9704 upgrade tests patch by bdeggleston; reviewed by slebresne for CASSANDRA-9893 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9658ee9f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9658ee9f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9658ee9f Branch: refs/heads/cassandra-3.0 Commit: 9658ee9fac43b6fd3b7726b7b49eb0046add8873 Parents: c3bc856 Author: Blake Eggleston <[email protected]> Authored: Fri Aug 21 19:35:04 2015 -0700 Committer: Sylvain Lebresne <[email protected]> Committed: Fri Sep 4 17:05:21 2015 +0200 ---------------------------------------------------------------------- .../org/apache/cassandra/db/LegacyLayout.java | 13 +- .../org/apache/cassandra/db/ReadCommand.java | 6 +- src/java/org/apache/cassandra/db/Slices.java | 3 +- .../cassandra/db/filter/ColumnFilter.java | 2 +- .../db/SinglePartitionSliceCommandTest.java | 182 +++++++++++++++++++ 5 files changed, 200 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9658ee9f/src/java/org/apache/cassandra/db/LegacyLayout.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java index b6f6657..d73d9cb 100644 --- a/src/java/org/apache/cassandra/db/LegacyLayout.java +++ b/src/java/org/apache/cassandra/db/LegacyLayout.java @@ -853,6 +853,17 @@ public abstract class LegacyLayout }; } + private static boolean equalValues(ClusteringPrefix c1, ClusteringPrefix c2, ClusteringComparator comparator) + { + assert c1.size() == c2.size(); + for (int i = 0; i < c1.size(); i++) + { + if (comparator.compareComponent(i, c1.get(i), c2.get(i)) != 0) + return false; + } + return true; + } + private static Comparator<LegacyAtom> legacyAtomComparator(CFMetaData metadata) { return (o1, o2) -> @@ -864,7 +875,7 @@ public abstract class LegacyLayout ClusteringPrefix c2 = o2.clustering(); int clusteringComparison; - if (c1.size() != c2.size() || (o1.isCell() == o2.isCell())) + if (c1.size() != c2.size() || (o1.isCell() == o2.isCell()) || !equalValues(c1, c2, metadata.comparator)) { clusteringComparison = metadata.comparator.compare(c1, c2); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9658ee9f/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 0ccd229..5a10716 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -863,7 +863,7 @@ public abstract class ReadCommand implements ReadQuery PartitionColumns columns = filter.selects(Clustering.STATIC_CLUSTERING) ? metadata.partitionColumns() : metadata.partitionColumns().withoutStatics(); - return new ColumnFilter.Builder(metadata).addAll(columns).build(); + return ColumnFilter.selectionBuilder().addAll(columns).build(); } } @@ -1208,7 +1208,7 @@ public abstract class ReadCommand implements ReadQuery // fully specified. We need to handle those cases differently in 3.0. NavigableSet<Clustering> clusterings = new TreeSet<>(metadata.comparator); - ColumnFilter.Builder selectionBuilder = new ColumnFilter.Builder(metadata); + ColumnFilter.Builder selectionBuilder = ColumnFilter.selectionBuilder(); for (int i = 0; i < numCellNames; i++) { ByteBuffer buffer = ByteBufferUtil.readWithShortLength(in); @@ -1287,7 +1287,7 @@ public abstract class ReadCommand implements ReadQuery PartitionColumns columns = selectsStatics ? metadata.partitionColumns() : metadata.partitionColumns().withoutStatics(); - ColumnFilter columnFilter = new ColumnFilter.Builder(metadata).addAll(columns).build(); + ColumnFilter columnFilter = ColumnFilter.selectionBuilder().addAll(columns).build(); boolean isDistinct = compositesToGroup == -2 || (count == 1 && selectsStatics); DataLimits limits; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9658ee9f/src/java/org/apache/cassandra/db/Slices.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Slices.java b/src/java/org/apache/cassandra/db/Slices.java index 9dd4a48..bde9d96 100644 --- a/src/java/org/apache/cassandra/db/Slices.java +++ b/src/java/org/apache/cassandra/db/Slices.java @@ -876,7 +876,8 @@ public abstract class Slices implements Iterable<Slice> public UnfilteredRowIterator makeSliceIterator(SliceableUnfilteredRowIterator iter) { - return UnfilteredRowIterators.emptyIterator(iter.metadata(), iter.partitionKey(), iter.isReverseOrder()); + return UnfilteredRowIterators.noRowsIterator(iter.metadata(), iter.partitionKey(), iter.staticRow(), + iter.partitionLevelDeletion(), iter.isReverseOrder()); } public Iterator<Slice> iterator() http://git-wip-us.apache.org/repos/asf/cassandra/blob/9658ee9f/src/java/org/apache/cassandra/db/filter/ColumnFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java index 9205ff9..29b3164 100644 --- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java @@ -246,7 +246,7 @@ public class ColumnFilter private PartitionColumns.Builder selection; private List<ColumnSubselection> subSelections; - public Builder(CFMetaData metadata) + private Builder(CFMetaData metadata) { this.metadata = metadata; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9658ee9f/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java new file mode 100644 index 0000000..15b566e --- /dev/null +++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java @@ -0,0 +1,182 @@ +package org.apache.cassandra.db; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Iterator; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.marshal.IntegerType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.util.ByteBufferDataInput; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +public class SinglePartitionSliceCommandTest +{ + private static final Logger logger = LoggerFactory.getLogger(SinglePartitionSliceCommandTest.class); + + private static final String KEYSPACE = "ks"; + private static final String TABLE = "tbl"; + + private static CFMetaData cfm; + private static ColumnDefinition v; + private static ColumnDefinition s; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + cfm = CFMetaData.Builder.create(KEYSPACE, TABLE) + .addPartitionKey("k", UTF8Type.instance) + .addStaticColumn("s", UTF8Type.instance) + .addClusteringColumn("i", IntegerType.instance) + .addRegularColumn("v", UTF8Type.instance) + .build(); + + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), cfm); + cfm = Schema.instance.getCFMetaData(KEYSPACE, TABLE); + v = cfm.getColumnDefinition(new ColumnIdentifier("v", true)); + s = cfm.getColumnDefinition(new ColumnIdentifier("s", true)); + } + + @Test + public void staticColumnsAreFiltered() throws IOException + { + DecoratedKey key = cfm.decorateKey(ByteBufferUtil.bytes("k")); + + UntypedResultSet rows; + + QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, s, i, v) VALUES ('k', 's', 0, 'v')"); + QueryProcessor.executeInternal("DELETE v FROM ks.tbl WHERE k='k' AND i=0"); + QueryProcessor.executeInternal("DELETE FROM ks.tbl WHERE k='k' AND i=0"); + rows = QueryProcessor.executeInternal("SELECT * FROM ks.tbl WHERE k='k' AND i=0"); + + for (UntypedResultSet.Row row: rows) + { + logger.debug("Current: k={}, s={}, v={}", (row.has("k") ? row.getString("k") : null), (row.has("s") ? row.getString("s") : null), (row.has("v") ? row.getString("v") : null)); + } + + assert rows.isEmpty(); + + ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(v)); + ByteBuffer zero = ByteBufferUtil.bytes(0); + Slices slices = Slices.with(cfm.comparator, Slice.make(Slice.Bound.inclusiveStartOf(zero), Slice.Bound.inclusiveEndOf(zero))); + ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(slices, false); + ReadCommand cmd = new SinglePartitionSliceCommand(false, MessagingService.VERSION_30, true, cfm, + FBUtilities.nowInSeconds(), + columnFilter, + RowFilter.NONE, + DataLimits.NONE, + key, + sliceFilter); + + DataOutputBuffer out = new DataOutputBuffer((int) ReadCommand.legacyReadCommandSerializer.serializedSize(cmd, MessagingService.VERSION_21)); + ReadCommand.legacyReadCommandSerializer.serialize(cmd, out, MessagingService.VERSION_21); + DataInputPlus in = new ByteBufferDataInput(out.buffer(), null, 0, 0); + cmd = ReadCommand.legacyReadCommandSerializer.deserialize(in, MessagingService.VERSION_21); + + logger.debug("ReadCommand: {}", cmd); + UnfilteredPartitionIterator partitionIterator = cmd.executeLocally(ReadOrderGroup.emptyGroup()); + ReadResponse response = ReadResponse.createDataResponse(partitionIterator, cmd.columnFilter()); + + logger.debug("creating response: {}", response); + partitionIterator = response.makeIterator(cfm, null); // <- cmd is null + assert partitionIterator.hasNext(); + UnfilteredRowIterator partition = partitionIterator.next(); + + LegacyLayout.LegacyUnfilteredPartition rowIter = LegacyLayout.fromUnfilteredRowIterator(partition); + Assert.assertEquals(Collections.emptyList(), rowIter.cells); + } + + private void checkForS(UnfilteredPartitionIterator pi) + { + Assert.assertTrue(pi.toString(), pi.hasNext()); + UnfilteredRowIterator ri = pi.next(); + Assert.assertTrue(ri.columns().contains(s)); + Row staticRow = ri.staticRow(); + Iterator<Cell> cellIterator = staticRow.cells().iterator(); + Assert.assertTrue(staticRow.toString(cfm, true), cellIterator.hasNext()); + Cell cell = cellIterator.next(); + Assert.assertEquals(s, cell.column()); + Assert.assertEquals(ByteBufferUtil.bytesToHex(cell.value()), ByteBufferUtil.bytes("s"), cell.value()); + Assert.assertFalse(cellIterator.hasNext()); + } + + @Test + public void staticColumnsAreReturned() throws IOException + { + DecoratedKey key = cfm.decorateKey(ByteBufferUtil.bytes("k")); + + QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, s) VALUES ('k', 's')"); + Assert.assertFalse(QueryProcessor.executeInternal("SELECT s FROM ks.tbl WHERE k='k'").isEmpty()); + + ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s)); + ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.NONE, false); + ReadCommand cmd = new SinglePartitionSliceCommand(false, MessagingService.VERSION_30, true, cfm, + FBUtilities.nowInSeconds(), + columnFilter, + RowFilter.NONE, + DataLimits.NONE, + key, + sliceFilter); + + UnfilteredPartitionIterator pi; + + // check raw iterator for static cell + pi = cmd.executeLocally(ReadOrderGroup.emptyGroup()); + checkForS(pi); + + ReadResponse response; + DataOutputBuffer out; + DataInputPlus in; + ReadResponse dst; + + // check (de)serialized iterator for memtable static cell + pi = cmd.executeLocally(ReadOrderGroup.emptyGroup()); + response = ReadResponse.createDataResponse(pi, cmd.columnFilter()); + out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30)); + ReadResponse.serializer.serialize(response, out, MessagingService.VERSION_30); + in = new ByteBufferDataInput(out.buffer(), null, 0, 0); + dst = ReadResponse.serializer.deserialize(in, MessagingService.VERSION_30); + pi = dst.makeIterator(cfm, cmd); + checkForS(pi); + + // check (de)serialized iterator for sstable static cell + Schema.instance.getColumnFamilyStoreInstance(cfm.cfId).forceBlockingFlush(); + pi = cmd.executeLocally(ReadOrderGroup.emptyGroup()); + response = ReadResponse.createDataResponse(pi, cmd.columnFilter()); + out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30)); + ReadResponse.serializer.serialize(response, out, MessagingService.VERSION_30); + in = new ByteBufferDataInput(out.buffer(), null, 0, 0); + dst = ReadResponse.serializer.deserialize(in, MessagingService.VERSION_30); + pi = dst.makeIterator(cfm, cmd); + checkForS(pi); + } +}
