http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a4253b6/src/java/org/apache/cassandra/db/ReadQuery.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java b/src/java/org/apache/cassandra/db/ReadQuery.java index 3abffd5..d1f5272 100644 --- a/src/java/org/apache/cassandra/db/ReadQuery.java +++ b/src/java/org/apache/cassandra/db/ReadQuery.java @@ -33,7 +33,7 @@ import org.apache.cassandra.service.pager.PagingState; */ public interface ReadQuery { - public static final ReadQuery EMPTY = new ReadQuery() + ReadQuery EMPTY = new ReadQuery() { public ReadOrderGroup startOrderGroup() { @@ -67,6 +67,16 @@ public interface ReadQuery { return QueryPager.EMPTY; } + + public boolean selectsKey(DecoratedKey key) + { + return false; + } + + public boolean selectsClustering(DecoratedKey key, Clustering clustering) + { + return false; + } }; /** @@ -116,4 +126,16 @@ public interface ReadQuery * @return The limits for the query. */ public DataLimits limits(); + + /** + * @return true if the read query would select the given key, including checks against the row filter, if + * checkRowFilter is true + */ + public boolean selectsKey(DecoratedKey key); + + /** + * @return true if the read query would select the given clustering, including checks against the row filter, if + * checkRowFilter is true + */ + public boolean selectsClustering(DecoratedKey key, Clustering clustering); }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a4253b6/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 49cf07c..a8e37b4 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import com.google.common.collect.Iterables; import org.apache.cassandra.cache.IRowCacheEntry; import org.apache.cassandra.cache.RowCacheKey; import org.apache.cassandra.cache.RowCacheSentinel; @@ -190,15 +191,23 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter return DatabaseDescriptor.getReadRpcTimeout(); } - public boolean selects(DecoratedKey partitionKey, Clustering clustering) + public boolean selectsKey(DecoratedKey key) { - if (!partitionKey().equals(partitionKey)) + if (!this.partitionKey().equals(key)) return false; + return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().getKeyValidator()); + } + + public boolean selectsClustering(DecoratedKey key, Clustering clustering) + { if (clustering == Clustering.STATIC_CLUSTERING) return !columnFilter().fetchedColumns().statics.isEmpty(); - return clusteringIndexFilter().selects(clustering); + if (!clusteringIndexFilter().selects(clustering)) + return false; + + return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering); } /** @@ -503,6 +512,16 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter return new MultiPartitionPager(this, pagingState, protocolVersion); } + public boolean selectsKey(DecoratedKey key) + { + return Iterables.any(commands, c -> c.selectsKey(key)); + } + + public boolean selectsClustering(DecoratedKey key, Clustering clustering) + { + return Iterables.any(commands, c -> c.selectsClustering(key, clustering)); + } + @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a4253b6/src/java/org/apache/cassandra/db/filter/RowFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java index b5968d5..0ff30af 100644 --- a/src/java/org/apache/cassandra/db/filter/RowFilter.java +++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java @@ -115,6 +115,45 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> public abstract UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec); /** + * Returns true if all of the expressions within this filter that apply to the partition key are satisfied by + * the given key, false otherwise. + */ + public boolean partitionKeyRestrictionsAreSatisfiedBy(DecoratedKey key, AbstractType<?> keyValidator) + { + for (Expression e : expressions) + { + if (!e.column.isPartitionKey()) + continue; + + ByteBuffer value = keyValidator instanceof CompositeType + ? ((CompositeType) keyValidator).split(key.getKey())[e.column.position()] + : key.getKey(); + if (!e.operator().isSatisfiedBy(e.column.type, value, e.value)) + return false; + } + return true; + } + + /** + * Returns true if all of the expressions within this filter that apply to the clustering key are satisfied by + * the given Clustering, false otherwise. + */ + public boolean clusteringKeyRestrictionsAreSatisfiedBy(Clustering clustering) + { + for (Expression e : expressions) + { + if (!e.column.isClusteringColumn()) + continue; + + if (!e.operator().isSatisfiedBy(e.column.type, clustering.get(e.column.position()), e.value)) + { + return false; + } + } + return true; + } + + /** * Returns this filter but without the provided expression. This method * *assumes* that the filter contains the provided expression. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a4253b6/src/java/org/apache/cassandra/db/view/TemporalRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java index 6eb9071..46dc3fa 100644 --- a/src/java/org/apache/cassandra/db/view/TemporalRow.java +++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import com.google.common.base.MoreObjects; import com.google.common.collect.Iterables; import org.apache.cassandra.config.CFMetaData; @@ -94,6 +95,18 @@ public class TemporalRow this.isNew = isNew; } + @Override + public String toString() + { + return MoreObjects.toStringHelper(this) + .add("value", value == null ? "null" : ByteBufferUtil.bytesToHex(value)) + .add("timestamp", timestamp) + .add("ttl", ttl) + .add("localDeletionTime", localDeletionTime) + .add("isNew", isNew) + .toString(); + } + public TemporalCell reconcile(TemporalCell that) { int now = FBUtilities.nowInSeconds(); @@ -208,13 +221,13 @@ public class TemporalRow if (cell.isNew) { - assert newCell == null || newCell.equals(cell) : "Only one cell version can be marked New"; + assert newCell == null || newCell.equals(cell) : "Only one cell version can be marked New; newCell: " + newCell + ", cell: " + cell; newCell = cell; numSet = existingCell == null ? 1 : 2; } else { - assert existingCell == null || existingCell.equals(cell) : "Only one cell version can be marked Existing"; + assert existingCell == null || existingCell.equals(cell) : "Only one cell version can be marked Existing; existingCell: " + existingCell + ", cell: " + cell; existingCell = cell; numSet = newCell == null ? 1 : 2; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a4253b6/src/java/org/apache/cassandra/db/view/View.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java index 28ec489..0a7f747 100644 --- a/src/java/org/apache/cassandra/db/view/View.java +++ b/src/java/org/apache/cassandra/db/view/View.java @@ -18,53 +18,30 @@ package org.apache.cassandra.db.view; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.UUID; +import java.util.*; +import java.util.stream.Collectors; import javax.annotation.Nullable; import com.google.common.collect.Iterables; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.config.ViewDefinition; -import org.apache.cassandra.config.Schema; +import org.apache.cassandra.cql3.*; +import org.apache.cassandra.cql3.statements.ParsedStatement; +import org.apache.cassandra.cql3.statements.SelectStatement; +import org.apache.cassandra.db.*; +import org.apache.cassandra.config.*; import org.apache.cassandra.cql3.ColumnIdentifier; -import org.apache.cassandra.db.AbstractReadCommandBuilder; import org.apache.cassandra.db.AbstractReadCommandBuilder.SinglePartitionSliceBuilder; -import org.apache.cassandra.db.CBuilder; -import org.apache.cassandra.db.Clustering; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.DeletionInfo; -import org.apache.cassandra.db.DeletionTime; -import org.apache.cassandra.db.LivenessInfo; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.RangeTombstone; -import org.apache.cassandra.db.ReadCommand; -import org.apache.cassandra.db.ReadOrderGroup; -import org.apache.cassandra.db.SinglePartitionReadCommand; -import org.apache.cassandra.db.Slice; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.partitions.AbstractBTreePartition; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.db.rows.BTreeRow; -import org.apache.cassandra.db.rows.Cell; -import org.apache.cassandra.db.rows.ColumnData; -import org.apache.cassandra.db.rows.ComplexColumnData; -import org.apache.cassandra.db.rows.Row; -import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.pager.QueryPager; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.transport.Server; +import org.apache.cassandra.utils.FBUtilities; /** * A View copies data from a base table into a view table which can be queried independently from the @@ -111,6 +88,13 @@ public class View private final boolean includeAllColumns; private ViewBuilder builder; + // Only the raw statement can be final, because the statement cannot always be prepared when the MV is initialized. + // For example, during startup, this view will be initialized as part of the Keyspace.open() work; preparing a statement + // also requires the keyspace to be open, so this results in double-initialization problems. + private final SelectStatement.RawStatement rawSelect; + private SelectStatement select; + private ReadQuery query; + public View(ViewDefinition definition, ColumnFamilyStore baseCfs) { @@ -120,6 +104,7 @@ public class View includeAllColumns = definition.includeAllColumns; viewHasAllPrimaryKeys = updateDefinition(definition); + this.rawSelect = definition.select; } public ViewDefinition getDefinition() @@ -205,9 +190,9 @@ public class View */ public boolean updateAffectsView(AbstractBTreePartition partition) { - // If we are including all of the columns, then any update will be included - if (includeAllColumns) - return true; + ReadQuery selectQuery = getReadQuery(); + if (!selectQuery.selectsKey(partition.partitionKey())) + return false; // If there are range tombstones, tombstones will also need to be generated for the view // This requires a query of the base rows and generating tombstones for all of those values @@ -217,7 +202,10 @@ public class View // Check each row for deletion or update for (Row row : partition) { - if (!row.deletion().isLive()) + if (!selectQuery.selectsClustering(partition.partitionKey(), row.clustering())) + continue; + + if (includeAllColumns || viewHasAllPrimaryKeys || !row.deletion().isLive()) return true; if (row.primaryKeyLivenessInfo().isLive(FBUtilities.nowInSeconds())) @@ -440,7 +428,7 @@ public class View if (!deletionInfo.getPartitionDeletion().isLive()) { - command = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, rowSet.nowInSec, dk); + command = getSelectStatement().internalReadForView(dk, rowSet.nowInSec); } else { @@ -459,11 +447,15 @@ public class View if (command == null) { + ReadQuery selectQuery = getReadQuery(); SinglePartitionSliceBuilder builder = null; for (Row row : partition) { if (!row.deletion().isLive()) { + if (!selectQuery.selectsClustering(rowSet.dk, row.clustering())) + continue; + if (builder == null) builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk); builder.addSlice(Slice.make(row.clustering())); @@ -476,10 +468,10 @@ public class View if (command != null) { + ReadQuery selectQuery = getReadQuery(); + assert selectQuery.selectsKey(rowSet.dk); - //We may have already done this work for - //another MV update so check - + // We may have already done this work for another MV update so check if (!rowSet.hasTombstonedExisting()) { QueryPager pager = command.getPager(null, Server.CURRENT_VERSION); @@ -498,7 +490,8 @@ public class View while (rowIterator.hasNext()) { Row row = rowIterator.next(); - rowSet.addRow(row, false); + if (selectQuery.selectsClustering(rowSet.dk, row.clustering())) + rowSet.addRow(row, false); } } } @@ -609,6 +602,34 @@ public class View return rowSet; } + /** + * Returns the SelectStatement used to populate and filter this view. Internal users should access the select + * statement this way to ensure it has been prepared. + */ + public SelectStatement getSelectStatement() + { + if (select == null) + { + ClientState state = ClientState.forInternalCalls(); + state.setKeyspace(baseCfs.keyspace.getName()); + rawSelect.prepareKeyspace(state); + ParsedStatement.Prepared prepared = rawSelect.prepare(true); + select = (SelectStatement) prepared.statement; + } + + return select; + } + + /** + * Returns the ReadQuery used to filter this view. Internal users should access the query this way to ensure it + * has been prepared. + */ + public ReadQuery getReadQuery() + { + if (query == null) + query = getSelectStatement().getQuery(QueryOptions.forInternalCalls(Collections.emptyList()), FBUtilities.nowInSeconds()); + return query; + } /** * @param isBuilding If the view is currently being built, we do not query the values which are already stored, @@ -683,4 +704,55 @@ public class View final UUID baseId = Schema.instance.getId(keyspace, baseTable); return Iterables.filter(ksm.views, view -> view.baseTableId.equals(baseId)); } + + /** + * Builds the string text for a materialized view's SELECT statement. + */ + public static String buildSelectStatement(String cfName, Collection<ColumnDefinition> includedColumns, String whereClause) + { + StringBuilder rawSelect = new StringBuilder("SELECT "); + if (includedColumns == null || includedColumns.isEmpty()) + rawSelect.append("*"); + else + rawSelect.append(includedColumns.stream().map(id -> id.name.toCQLString()).collect(Collectors.joining(", "))); + rawSelect.append(" FROM \"").append(cfName).append("\" WHERE ") .append(whereClause).append(" ALLOW FILTERING"); + return rawSelect.toString(); + } + + public static String relationsToWhereClause(List<Relation> whereClause) + { + List<String> expressions = new ArrayList<>(whereClause.size()); + for (Relation rel : whereClause) + { + StringBuilder sb = new StringBuilder(); + + if (rel.isMultiColumn()) + { + sb.append(((MultiColumnRelation) rel).getEntities().stream() + .map(ColumnIdentifier.Raw::toCQLString) + .collect(Collectors.joining(", ", "(", ")"))); + } + else + { + sb.append(((SingleColumnRelation) rel).getEntity().toCQLString()); + } + + sb.append(" ").append(rel.operator()).append(" "); + + if (rel.isIN()) + { + sb.append(rel.getInValues().stream() + .map(Term.Raw::getText) + .collect(Collectors.joining(", ", "(", ")"))); + } + else + { + sb.append(rel.getValue().getText()); + } + + expressions.add(sb.toString()); + } + + return expressions.stream().collect(Collectors.joining(" AND ")); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a4253b6/src/java/org/apache/cassandra/db/view/ViewBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java index f0b01c7..0a0fe08 100644 --- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java +++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java @@ -29,12 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.ReadOrderGroup; -import org.apache.cassandra.db.SinglePartitionReadCommand; -import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.CompactionInfo; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.OperationType; @@ -44,7 +39,6 @@ import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.io.sstable.ReducingKeyIterator; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.service.StorageProxy; @@ -52,7 +46,6 @@ import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.pager.QueryPager; import org.apache.cassandra.transport.Server; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.concurrent.Refs; @@ -77,7 +70,11 @@ public class ViewBuilder extends CompactionInfo.Holder private void buildKey(DecoratedKey key) { - QueryPager pager = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, FBUtilities.nowInSeconds(), key).getPager(null, Server.CURRENT_VERSION); + ReadQuery selectQuery = view.getReadQuery(); + if (!selectQuery.selectsKey(key)) + return; + + QueryPager pager = view.getSelectStatement().internalReadForView(key, FBUtilities.nowInSeconds()).getPager(null, Server.CURRENT_VERSION); while (!pager.isExhausted()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a4253b6/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java index 77867fc..f1751f5 100644 --- a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java +++ b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java @@ -51,7 +51,7 @@ public class CompositesSearcher extends CassandraIndexSearcher private boolean isMatchingEntry(DecoratedKey partitionKey, IndexEntry entry, ReadCommand command) { - return command.selects(partitionKey, entry.indexedEntryClustering); + return command.selectsKey(partitionKey) && command.selectsClustering(partitionKey, entry.indexedEntryClustering); } protected UnfilteredPartitionIterator queryDataFromIndex(final DecoratedKey indexKey, http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a4253b6/src/java/org/apache/cassandra/schema/SchemaKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index fb97ca5..5f27d82 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -35,14 +35,14 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.*; import org.apache.cassandra.config.ColumnDefinition.ClusteringOrder; -import org.apache.cassandra.cql3.ColumnIdentifier; -import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.cql3.*; import org.apache.cassandra.cql3.functions.*; +import org.apache.cassandra.cql3.statements.SelectStatement; import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.view.View; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.utils.ByteBufferUtil; @@ -75,6 +75,7 @@ public final class SchemaKeyspace public static final String AGGREGATES = "aggregates"; public static final String INDEXES = "indexes"; + public static final List<String> ALL = ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES); @@ -155,6 +156,7 @@ public final class SchemaKeyspace + "view_name text," + "base_table_id uuid," + "base_table_name text," + + "where_clause text," + "bloom_filter_fp_chance double," + "caching frozen<map<text, text>>," + "comment text," @@ -1311,6 +1313,7 @@ public final class SchemaKeyspace builder.add("include_all_columns", view.includeAllColumns) .add("base_table_id", view.baseTableId) .add("base_table_name", view.baseTableMetadata().cfName) + .add("where_clause", view.whereClause) .add("id", table.cfId); addTableParamsToSchemaMutation(table.params, builder); @@ -1426,7 +1429,9 @@ public final class SchemaKeyspace String view = row.getString("view_name"); UUID id = row.getUUID("id"); UUID baseTableId = row.getUUID("base_table_id"); + String baseTableName = row.getString("base_table_name"); boolean includeAll = row.getBoolean("include_all_columns"); + String whereClause = row.getString("where_clause"); List<ColumnDefinition> columns = readSchemaPartitionForTableAndApply(COLUMNS, keyspace, view, SchemaKeyspace::createColumnsFromColumnsPartition); @@ -1447,7 +1452,10 @@ public final class SchemaKeyspace .params(createTableParamsFromRow(row)) .droppedColumns(droppedColumns); - return new ViewDefinition(keyspace, view, baseTableId, includeAll, cfm); + String rawSelect = View.buildSelectStatement(baseTableName, columns, whereClause); + SelectStatement.RawStatement rawStatement = (SelectStatement.RawStatement) QueryProcessor.parseStatement(rawSelect); + + return new ViewDefinition(keyspace, view, baseTableId, baseTableName, includeAll, rawStatement, whereClause, cfm); } /* http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a4253b6/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 61e4fc2..e92563b 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -29,6 +29,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import com.datastax.driver.core.*; import com.datastax.driver.core.ResultSet; @@ -790,6 +791,83 @@ public abstract class CQLTester Assert.assertTrue(String.format("Got %s rows than expected. Expected %d but got %d", rows.length>i ? "less" : "more", rows.length, i), i == rows.length); } + /** + * Like assertRows(), but ignores the ordering of rows. + */ + public static void assertRowsIgnoringOrder(UntypedResultSet result, Object[]... rows) + { + if (result == null) + { + if (rows.length > 0) + Assert.fail(String.format("No rows returned by query but %d expected", rows.length)); + return; + } + + List<ColumnSpecification> meta = result.metadata(); + + Set<List<ByteBuffer>> expectedRows = new HashSet<>(rows.length); + for (Object[] expected : rows) + { + Assert.assertEquals("Invalid number of (expected) values provided for row", expected.length, meta.size()); + List<ByteBuffer> expectedRow = new ArrayList<>(meta.size()); + for (int j = 0; j < meta.size(); j++) + expectedRow.add(makeByteBuffer(expected[j], meta.get(j).type)); + expectedRows.add(expectedRow); + } + + Set<List<ByteBuffer>> actualRows = new HashSet<>(result.size()); + for (UntypedResultSet.Row actual : result) + { + List<ByteBuffer> actualRow = new ArrayList<>(meta.size()); + for (int j = 0; j < meta.size(); j++) + actualRow.add(actual.getBytes(meta.get(j).name.toString())); + actualRows.add(actualRow); + } + + com.google.common.collect.Sets.SetView<List<ByteBuffer>> extra = com.google.common.collect.Sets.difference(actualRows, expectedRows); + com.google.common.collect.Sets.SetView<List<ByteBuffer>> missing = com.google.common.collect.Sets.difference(expectedRows, actualRows); + if (!extra.isEmpty() || !missing.isEmpty()) + { + List<String> extraRows = makeRowStrings(extra, meta); + List<String> missingRows = makeRowStrings(missing, meta); + StringBuilder sb = new StringBuilder(); + if (!extra.isEmpty()) + { + sb.append("Got ").append(extra.size()).append(" extra row(s) "); + if (!missing.isEmpty()) + sb.append("and ").append(missing.size()).append(" missing row(s) "); + sb.append("in result. Extra rows:\n "); + sb.append(extraRows.stream().collect(Collectors.joining("\n "))); + if (!missing.isEmpty()) + sb.append("\nMissing Rows:\n ").append(missingRows.stream().collect(Collectors.joining("\n "))); + Assert.fail(sb.toString()); + } + + if (!missing.isEmpty()) + Assert.fail("Missing " + missing.size() + " row(s) in result: \n " + missingRows.stream().collect(Collectors.joining("\n "))); + } + + assert expectedRows.size() == actualRows.size(); + } + + private static List<String> makeRowStrings(Iterable<List<ByteBuffer>> rows, List<ColumnSpecification> meta) + { + List<String> strings = new ArrayList<>(); + for (List<ByteBuffer> row : rows) + { + StringBuilder sb = new StringBuilder("row("); + for (int j = 0; j < row.size(); j++) + { + ColumnSpecification column = meta.get(j); + sb.append(column.name.toString()).append("=").append(formatValue(row.get(j), column.type)); + if (j < (row.size() - 1)) + sb.append(", "); + } + strings.add(sb.append(")").toString()); + } + return strings; + } + protected void assertRowCount(UntypedResultSet result, int numExpectedRows) { if (result == null)
