This is an automated email from the ASF dual-hosted git repository. adelapena pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit d1f3d40afc5d20bab70c6200508baa3cd9409458 Merge: b063f30 0541c51 Author: Andrés de la Peña <[email protected]> AuthorDate: Mon Mar 1 12:48:32 2021 +0000 Merge branch 'cassandra-3.11' into trunk CHANGES.txt | 1 + .../cassandra/cql3/statements/CQL3CasRequest.java | 27 +- .../org/apache/cassandra/db/SystemKeyspace.java | 4 +- .../apache/cassandra/db/filter/ColumnFilter.java | 205 ++++--- src/java/org/apache/cassandra/gms/Gossiper.java | 70 ++- .../repair/SystemDistributedKeyspace.java | 2 +- .../apache/cassandra/tracing/TraceKeyspace.java | 4 +- .../apache/cassandra/utils/CassandraVersion.java | 14 + .../cassandra/utils/ExpiringMemoizingSupplier.java | 7 + .../test/ReadDigestConsistencyTest.java | 146 +++++ .../distributed/upgrade/MixedModeReadTest.java | 72 +-- test/unit/org/apache/cassandra/Util.java | 17 + .../operations/InsertUpdateIfConditionTest.java | 50 +- .../cassandra/db/filter/ColumnFilterTest.java | 676 +++++++++++++-------- .../org/apache/cassandra/gms/GossiperTest.java | 29 +- 15 files changed, 885 insertions(+), 439 deletions(-) diff --cc CHANGES.txt index 4dfa9d9,6bd40ad..95028a6 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,40 -1,8 +1,41 @@@ -3.11.11 +4.0-beta5 + * Prevent parent repair sessions leak (CASSANDRA-16446) + * Fix timestamp issue in SinglePartitionSliceCommandTest testPartitionD…eletionRowDeletionTie (CASSANDRA-16443) + * Promote protocol V5 out of beta (CASSANDRA-14973) + * Fix incorrect encoding for strings can be UTF8 (CASSANDRA-16429) + * Fix node unable to join when RF > N in multi-DC with added warning (CASSANDRA-16296) + * Add an option to nodetool tablestats to check sstable location correctness (CASSANDRA-16344) + * Unable to ALTER KEYSPACE while decommissioned/assassinated nodes are in gossip (CASSANDRA-16422) + * Metrics backward compatibility restored after CASSANDRA-15066 (CASSANDRA-16083) + * Reduce new reserved keywords introduced since 3.0 (CASSANDRA-16439) + * Improve system tables handling in case of disk failures (CASSANDRA-14793) + * Add access and datacenters to unreserved keywords (CASSANDRA-16398) + * Fix nodetool ring, status output when DNS resolution or port printing are in use (CASSANDRA-16283) + * Upgrade Jacoco to 0.8.6 (for Java 11 support) (CASSANDRA-16365) + * Move excessive repair debug loggings to trace level (CASSANDRA-16406) + * Restore validation of each message's protocol version (CASSANDRA-16374) + * Upgrade netty and chronicle-queue dependencies to get Auditing and native library loading working on arm64 architectures (CASSANDRA-16384,CASSANDRA-16392) + * Release StreamingTombstoneHistogramBuilder spool when switching writers (CASSANDRA-14834) + * Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318) + * Fix client notifications in CQL protocol v5 (CASSANDRA-16353) + * Too defensive check when picking sstables for preview repair (CASSANDRA-16284) + * Ensure pre-negotiation native protocol responses have correct stream id (CASSANDRA-16376) + * Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279) + * SSLFactory should initialize SSLContext before setting protocols (CASSANDRA-16362) + * Restore sasi dependencies jflex, snowball-stemmer, and concurrent-trees, in the cassandra-all pom (CASSANDRA-16303) + * Fix DecimalDeserializer#toString OOM (CASSANDRA-14925) +Merged from 3.11: + * Fix digest computation for queries with fetched but non queried columns (CASSANDRA-15962) + * Reduce amount of allocations during batch statement execution (CASSANDRA-16201) + * Update jflex-1.6.0.jar to match upstream (CASSANDRA-16393) Merged from 3.0: + * Fix ColumnFilter behaviour to prevent digest mitmatches during upgrades (CASSANDRA-16415) * Update debian packaging for python3 (CASSANDRA-16396) * Avoid pushing schema mutations when setting up distributed system keyspaces locally (CASSANDRA-16387) + * Prevent unbounded number of pending flushing tasks (CASSANDRA-16261) + * Improve empty hint file handling during startup (CASSANDRA-16162) + * Fix skipping on pre-3.0 created compact storage sstables due to missing primary key liveness (CASSANDRA-16226) + * Allow empty string in collections with COPY FROM in cqlsh (CASSANDRA-16372) Merged from 2.2: * Make TokenMetadata's ring version increments atomic (CASSANDRA-16286) diff --cc src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java index d61381d,47920a4..563a639 --- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java +++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java @@@ -162,26 -161,21 +162,21 @@@ public class CQL3CasRequest implements } } - private PartitionColumns columnsToRead() + private RegularAndStaticColumns columnsToRead() { - // If all our conditions are columns conditions (IF x = ?), then it's enough to query - // the columns from the conditions. If we have a IF EXISTS or IF NOT EXISTS however, - // we need to query all columns for the row since if the condition fails, we want to - // return everything to the user. Static columns make this a bit more complex, in that - // if an insert only static columns, then the existence condition applies only to the - // static columns themselves, and so we don't want to include regular columns in that - // case. - if (hasExists) - { - RegularAndStaticColumns allColumns = metadata.regularAndStaticColumns(); - Columns statics = updatesStaticRow ? allColumns.statics : Columns.NONE; - Columns regulars = updatesRegularRows ? allColumns.regulars : Columns.NONE; - return new RegularAndStaticColumns(statics, regulars); - } - return conditionColumns; - PartitionColumns allColumns = cfm.partitionColumns(); ++ RegularAndStaticColumns allColumns = metadata.regularAndStaticColumns(); + + // If we update static row, we won't have any conditions on regular rows. + // If we update regular row, we have to fetch all regular rows (which would satisfy column condition) and + // static rows that take part in column condition. + // In both cases, we're fetching enough rows to distinguish between "all conditions are nulls" and "row does not exist". + // We have to do this as we can't rely on row marker for that (see #6623) + Columns statics = updatesStaticRow ? allColumns.statics : conditionColumns.statics; + Columns regulars = updatesRegularRows ? allColumns.regulars : conditionColumns.regulars; - return new PartitionColumns(statics, regulars); ++ return new RegularAndStaticColumns(statics, regulars); } - public SinglePartitionReadCommand readCommand(int nowInSec) + public SinglePartitionReadQuery readCommand(int nowInSec) { assert staticConditions != null || !conditions.isEmpty(); diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java index 278541d,196face..25078b8 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@@ -90,6 -91,6 +90,8 @@@ public final class SystemKeyspac // Cassandra was not previously installed and we're in the process of starting a fresh node. public static final CassandraVersion NULL_VERSION = new CassandraVersion("0.0.0-absent"); ++ public static final CassandraVersion CURRENT_VERSION = new CassandraVersion(FBUtilities.getReleaseVersionString()); ++ public static final String BATCHES = "batches"; public static final String PAXOS = "paxos"; public static final String BUILT_INDEXES = "IndexInfo"; @@@ -924,12 -941,12 +926,12 @@@ { try { - if (FBUtilities.getBroadcastAddress().equals(ep)) + if (FBUtilities.getBroadcastAddressAndPort().equals(ep)) { -- return new CassandraVersion(FBUtilities.getReleaseVersionString()); ++ return CURRENT_VERSION; } - String req = "SELECT release_version FROM system.%s WHERE peer=?"; - UntypedResultSet result = executeInternal(String.format(req, PEERS), ep); + String req = "SELECT release_version FROM system.%s WHERE peer=? AND peer_port=?"; + UntypedResultSet result = executeInternal(String.format(req, PEERS_V2), ep.address, ep.port); if (result != null && result.one().has("release_version")) { return new CassandraVersion(result.one().getString("release_version")); diff --cc src/java/org/apache/cassandra/db/filter/ColumnFilter.java index b2ffa52,f405431..8ca2ccc --- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java @@@ -20,22 -20,21 +20,25 @@@ package org.apache.cassandra.db.filter import java.io.IOException; import java.util.*; +import com.google.common.annotations.VisibleForTesting; - import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; import com.google.common.collect.SortedSetMultimap; import com.google.common.collect.TreeMultimap; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.db.rows.CellPath; -import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.TableMetadata; ++import org.apache.cassandra.utils.CassandraVersion; /** * Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected @@@ -66,67 -65,37 +69,127 @@@ */ public class ColumnFilter { + private final static Logger logger = LoggerFactory.getLogger(ColumnFilter.class); + + public static final ColumnFilter NONE = selection(RegularAndStaticColumns.NONE); + public static final Serializer serializer = new Serializer(); - // True if _fetched_ is all the columns, in which case metadata must not be null. If false, - // then _fetched_ == _queried_ and we only store _queried_. - private final boolean isFetchAll; + // True if _fetched_ includes all regular columns (and any static in _queried_), in which case metadata must not be + // null. If false, then _fetched_ == _queried_ and we only store _queried_. ++ @VisibleForTesting + final boolean fetchAllRegulars; + ++ // This flag can be only set when fetchAllRegulars is set. When fetchAllRegulars is set and queried==null then ++ // it is implied to be true. The flag when set allows for interpreting the column filter in the same way as it was ++ // interpreted by pre 4.0 Cassandra versions (3.4 ~ 4.0), that is, we fetch all columns (both regulars and static) ++ // but we query only some of them. This allows for proper behaviour during upgrades. ++ private final boolean fetchAllStatics; ++ ++ @VisibleForTesting + final RegularAndStaticColumns fetched; - final RegularAndStaticColumns queried; // can be null if fetchAllRegulars, to represent a wildcard query (all - // static and regular columns are both _fetched_ and _queried_). + - private final PartitionColumns fetched; - private final PartitionColumns queried; // can be null if isFetchAll and _fetched_ == _queried_ ++ private final RegularAndStaticColumns queried; // can be null if fetchAllRegulars, to represent a wildcard query (all ++ ++ // static and regular columns are both _fetched_ and _queried_). private final SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections; // can be null - private ColumnFilter(boolean isFetchAll, - PartitionColumns fetched, - PartitionColumns queried, + private ColumnFilter(boolean fetchAllRegulars, ++ boolean fetchAllStatics, + TableMetadata metadata, + RegularAndStaticColumns queried, SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections) { - assert !isFetchAll || fetched != null; - assert isFetchAll || queried != null; - this.isFetchAll = isFetchAll; - this.fetched = isFetchAll ? fetched : queried; + assert !fetchAllRegulars || metadata != null; + assert fetchAllRegulars || queried != null; ++ assert !fetchAllStatics || fetchAllRegulars; + this.fetchAllRegulars = fetchAllRegulars; ++ this.fetchAllStatics = fetchAllStatics || fetchAllRegulars && queried == null; + + if (fetchAllRegulars) + { + RegularAndStaticColumns all = metadata.regularAndStaticColumns(); + - this.fetched = (all.statics.isEmpty() || queried == null) ++ this.fetched = (all.statics.isEmpty() || queried == null || fetchAllStatics) + ? all + : new RegularAndStaticColumns(queried.statics, all.regulars); + } + else + { + this.fetched = queried; + } + this.queried = queried; this.subSelections = subSelections; } /** + * Used on replica for deserialisation + */ + private ColumnFilter(boolean fetchAllRegulars, ++ boolean fetchAllStatics, + RegularAndStaticColumns fetched, + RegularAndStaticColumns queried, + SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections) + { + assert !fetchAllRegulars || fetched != null; + assert fetchAllRegulars || queried != null; ++ assert !fetchAllStatics || fetchAllRegulars; + this.fetchAllRegulars = fetchAllRegulars; ++ this.fetchAllStatics = fetchAllStatics || fetchAllRegulars && queried == null; + this.fetched = fetchAllRegulars ? fetched : queried; + this.queried = queried; + this.subSelections = subSelections; + } + + /** ++ * Returns true if all static columns should be fetched along with all regular columns (it only makes sense to call ++ * this method if fetchAllRegulars is going to be true and queried != null). ++ * ++ * We have to apply this conversion when there are pre-4.0 nodes in the cluster because they interpret ++ * the ColumnFilter with fetchAllRegulars (translated to fetchAll in pre 4.0) and queried != null so that all ++ * the columns are fetched (both regular and static) and just some of them are queried. In 4.0+ with the same ++ * scenario, all regulars are fetched and only those statics which are queried. We need to apply the conversion ++ * so that the retrieved data is the same (note that non-queried columns may have skipped values or may not be ++ * included at all). ++ */ ++ private static boolean shouldFetchAllStatics() ++ { ++ if (Gossiper.instance.isUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_4_0)) ++ { ++ logger.trace("ColumnFilter conversion has been applied so that all static columns will be fetched because there are pre 4.0 nodes in the cluster"); ++ return true; ++ } ++ return false; ++ } ++ ++ /** ++ * Returns true if we want to consider all fetched columns as queried as well (it only makes sense to call ++ * this method if fetchAllRegulars is going to be true). ++ * ++ * We have to apply this conversion when there are pre-3.4 (in particular, pre CASSANDRA-10657) nodes in the cluster ++ * because they interpret the ColumnFilter with fetchAllRegulars (translated to fetchAll in pre 4.0) so that all ++ * fetched columns are queried. In 3.4+ with the same scenario, all the columns are fetched ++ * (though see {@link #shouldFetchAllStatics()}) but queried columns are taken into account in the way that we may ++ * skip values or whole cells when reading data. We need to apply the conversion so that the retrieved data is ++ * the same. ++ */ ++ private static boolean shouldQueriedBeNull() ++ { ++ if (Gossiper.instance.isUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_3_4)) ++ { ++ logger.trace("ColumnFilter conversion has been applied so that all columns will be queried because there are pre 3.4 nodes in the cluster"); ++ return true; ++ } ++ return false; ++ } ++ ++ /** * A filter that includes all columns for the provided table. */ - public static ColumnFilter all(CFMetaData metadata) + public static ColumnFilter all(TableMetadata metadata) { - return new ColumnFilter(true, metadata, null, null); - return new ColumnFilter(true, metadata.partitionColumns(), null, null); ++ return new ColumnFilter(true, true, metadata, null, null); } /** @@@ -136,18 -105,30 +199,18 @@@ * preserve CQL semantic (see class javadoc). This is ok for some internal queries however (and * for #6588 if/when we implement it). */ - public static ColumnFilter selection(PartitionColumns columns) + public static ColumnFilter selection(RegularAndStaticColumns columns) { - return new ColumnFilter(false, (TableMetadata) null, columns, null); - return new ColumnFilter(false, null, columns, null); ++ return new ColumnFilter(false, false, (TableMetadata) null, columns, null); } -- /** ++ /** * A filter that fetches all columns for the provided table, but returns * only the queried ones. */ - public static ColumnFilter selection(CFMetaData metadata, PartitionColumns queried) + public static ColumnFilter selection(TableMetadata metadata, RegularAndStaticColumns queried) { - return new ColumnFilter(true, metadata, queried, null); - // When fetchAll is enabled on pre CASSANDRA-10657 (3.4-), queried columns are not considered at all, and it - // is assumed that all columns are queried. CASSANDRA-10657 (3.4+) brings back skipping values of columns - // which are not in queried set when fetchAll is enabled. That makes exactly the same filter being - // interpreted in a different way on 3.4- and 3.4+. - // - // Moreover, there is no way to convert the filter with fetchAll and queried != null so that it is - // interpreted the same way on 3.4- because that Cassandra version does not support such filtering. - // - // In order to avoid inconsitencies in data read by 3.4- and 3.4+ we need to avoid creation of incompatible - // filters when the cluster contains 3.4- nodes. We do that by forcibly setting queried to null. - // - // see CASSANDRA-10657, CASSANDRA-15833, CASSANDRA-16415 - return new ColumnFilter(true, metadata.partitionColumns(), Gossiper.instance.isAnyNodeOn30() ? null : queried, null); ++ return new ColumnFilter(true, shouldFetchAllStatics(), metadata, shouldQueriedBeNull() ? null : queried, null); } /** @@@ -170,22 -151,9 +233,22 @@@ return queried == null ? fetched : queried; } - public boolean fetchesAllColumns() + /** + * Wether all the (regular or static) columns are fetched by this filter. + * <p> + * Note that this method is meant as an optimization but a negative return + * shouldn't be relied upon strongly: this can return {@code false} but + * still have all the columns fetches if those were manually selected by the + * user. The goal here is to cheaply avoid filtering things on wildcard + * queries, as those are common. + * + * @param isStatic whether to check for static columns or not. If {@code true}, + * the method returns if all static columns are fetched, otherwise it checks + * regular columns. + */ + public boolean fetchesAllColumns(boolean isStatic) { - return isStatic ? queried == null : fetchAllRegulars; - return isFetchAll; ++ return isStatic ? queried == null || fetchAllStatics : fetchAllRegulars; } /** @@@ -200,14 -168,9 +263,14 @@@ /** * Whether the provided column is fetched by this filter. */ - public boolean fetches(ColumnDefinition column) + public boolean fetches(ColumnMetadata column) { - return isFetchAll || queried.contains(column); + // For statics, it is included only if it's part of _queried_, or if _queried_ is null (wildcard query). + if (column.isStatic()) - return queried == null || queried.contains(column); ++ return fetchAllStatics || queried == null || queried.contains(column); + + // For regulars, if 'fetchAllRegulars', then it's included automatically. Otherwise, it depends on _queried_. + return fetchAllRegulars || queried.contains(column); } /** @@@ -271,24 -233,7 +334,24 @@@ if (s.isEmpty()) return null; - return new Tester(!column.isStatic() && fetchAllRegulars, s.iterator()); - return new Tester(isFetchAll, s.iterator()); ++ return new Tester(!column.isStatic() && fetchAllRegulars || column.isStatic() && fetchAllStatics, s.iterator()); + } + + /** + * Given an iterator on the cell of a complex column, returns an iterator that only include the cells selected by + * this filter. + * + * @param column the (complex) column for which the cells are. + * @param cells the cells to filter. + * @return a filtered iterator that only include the cells from {@code cells} that are included by this filter. + */ + public Iterator<Cell<?>> filterComplexCells(ColumnMetadata column, Iterator<Cell<?>> cells) + { + Tester tester = newTester(column); + if (tester == null) + return cells; + + return Iterators.filter(cells, cell -> tester.fetchedCellIsQueried(cell.path())); } /** @@@ -449,17 -371,17 +512,25 @@@ { s = TreeMultimap.create(Comparator.<ColumnIdentifier>naturalOrder(), Comparator.<ColumnSubselection>naturalOrder()); for (ColumnSubselection subSelection : subSelections) - s.put(subSelection.column().name, subSelection); - } - - // See the comment in {@link ColumnFilter#selection(CFMetaData, PartitionColumns)} - if (isFetchAll && queried != null && Gossiper.instance.isAnyNodeOn30()) - { - logger.trace("Column filter will be automatically converted to query all columns because 3.0 nodes are present in the cluster"); - queried = null; + { + if (fullySelectedComplexColumns == null || !fullySelectedComplexColumns.contains(subSelection.column())) + s.put(subSelection.column().name, subSelection); + } } - // see CASSANDRA-15833 - if (isFetchAll && Gossiper.instance.haveMajorVersion3Nodes()) - queried = null; - - return new ColumnFilter(isFetchAll, metadata, queried, s); - return new ColumnFilter(isFetchAll, isFetchAll ? metadata.partitionColumns() : null, queried, s); ++ // When fetchAll is enabled on pre CASSANDRA-10657 (3.4-), queried columns are not considered at all, and it ++ // is assumed that all columns are queried. CASSANDRA-10657 (3.4+) brings back skipping values of columns ++ // which are not in queried set when fetchAll is enabled. That makes exactly the same filter being ++ // interpreted in a different way on 3.4- and 3.4+. ++ // ++ // Moreover, there is no way to convert the filter with fetchAll and queried != null so that it is ++ // interpreted the same way on 3.4- because that Cassandra version does not support such filtering. ++ // ++ // In order to avoid inconsitencies in data read by 3.4- and 3.4+ we need to avoid creation of incompatible ++ // filters when the cluster contains 3.4- nodes. We do that by forcibly setting queried to null. ++ // ++ // see CASSANDRA-10657, CASSANDRA-15833, CASSANDRA-16415 ++ return new ColumnFilter(isFetchAll, isFetchAll && shouldFetchAllStatics(), metadata, isFetchAll && shouldQueriedBeNull() ? null : queried, s); } } @@@ -474,7 -396,7 +545,8 @@@ ColumnFilter otherCf = (ColumnFilter) other; - return otherCf.isFetchAll == this.isFetchAll && + return otherCf.fetchAllRegulars == this.fetchAllRegulars && ++ otherCf.fetchAllStatics == this.fetchAllStatics && Objects.equals(otherCf.fetched, this.fetched) && Objects.equals(otherCf.queried, this.queried) && Objects.equals(otherCf.subSelections, this.subSelections); @@@ -483,87 -405,49 +555,59 @@@ @Override public String toString() { - if (fetchAllRegulars && queried == null) - return "*"; + String prefix = ""; - if (isFetchAll && queried == null) + - if (queried.isEmpty()) - return ""; ++ if (fetchAllRegulars && queried == null) + return "*/*"; - Iterator<ColumnMetadata> defs = queried.selectOrderIterator(); - if (!defs.hasNext()) - return "<none>"; - if (isFetchAll) ++ if (fetchAllRegulars && fetchAllStatics) + prefix = "*/"; - StringBuilder sb = new StringBuilder(); - while (defs.hasNext()) - if (queried.isEmpty()) - return prefix + "[]"; ++ if (fetchAllRegulars && !fetchAllStatics) + { - appendColumnDef(sb, defs.next()); - if (defs.hasNext()) - sb.append(", "); ++ prefix = queried.statics.isEmpty() ++ ? "<all regulars>/" ++ : String.format("<all regulars>+%s/", columnsToString(queried.statics::selectOrderIterator)); + } - return sb.toString(); ++ ++ return prefix + columnsToString(queried::selectOrderIterator); + } - private void appendColumnDef(StringBuilder sb, ColumnMetadata column) ++ private String columnsToString(Iterable<ColumnMetadata> columns) + { - if (subSelections == null) + StringJoiner joiner = new StringJoiner(", ", "[", "]"); - Iterator<ColumnDefinition> it = queried.selectOrderIterator(); ++ Iterator<ColumnMetadata> it = columns.iterator(); + while (it.hasNext()) { - sb.append(column.name); - return; - } - ColumnDefinition column = it.next(); ++ ColumnMetadata column = it.next(); + SortedSet<ColumnSubselection> s = subSelections != null ? subSelections.get(column.name) : Collections.emptySortedSet(); - SortedSet<ColumnSubselection> s = subSelections.get(column.name); - if (s.isEmpty()) - { - sb.append(column.name); - return; + if (s.isEmpty()) + joiner.add(String.valueOf(column.name)); + else - s.forEach(subSel -> joiner.add(String.format("%s%s", column.name , subSel))); ++ s.forEach(subSel -> joiner.add(String.format("%s%s", column.name, subSel))); } - - int i = 0; - for (ColumnSubselection subSel : s) - sb.append(i++ == 0 ? "" : ", ").append(column.name).append(subSel); - return prefix + joiner.toString(); ++ return joiner.toString(); } public static class Serializer { - private static final int FETCH_ALL_MASK = 0x01; - private static final int HAS_QUERIED_MASK = 0x02; - private static final int IS_FETCH_ALL_MASK = 0x01; - private static final int HAS_QUERIED_MASK = 0x02; ++ private static final int FETCH_ALL_MASK = 0x01; ++ private static final int HAS_QUERIED_MASK = 0x02; private static final int HAS_SUB_SELECTIONS_MASK = 0x04; private static int makeHeaderByte(ColumnFilter selection) { - return (selection.isFetchAll ? IS_FETCH_ALL_MASK : 0) - | (selection.queried != null ? HAS_QUERIED_MASK : 0) - | (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK : 0); + return (selection.fetchAllRegulars ? FETCH_ALL_MASK : 0) - | (selection.queried != null ? HAS_QUERIED_MASK : 0) - | (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK : 0); - } - - @VisibleForTesting - public static ColumnFilter maybeUpdateForBackwardCompatility(ColumnFilter selection, int version) - { - if (version > MessagingService.VERSION_3014 || !selection.fetchAllRegulars || selection.queried == null) - return selection; - - // The meaning of fetchAllRegulars changed (at least when queried != null) due to CASSANDRA-12768: in - // pre-4.0 it means that *all* columns are fetched, not just the regular ones, and so 3.0/3.X nodes - // would send us more than we'd like. So instead recreating a filter that correspond to what we - // actually want (it's a tiny bit less efficient as we include all columns manually and will mark as - // queried some columns that are actually only fetched, but it's fine during upgrade). - // More concretely, we replace our filter by a non-fetch-all one that queries every columns that our - // current filter fetches. - Set<ColumnMetadata> queriedStatic = new HashSet<>(); - Iterables.addAll(queriedStatic, Iterables.filter(selection.queried, ColumnMetadata::isStatic)); - return new ColumnFilter(false, - (TableMetadata) null, - new RegularAndStaticColumns(Columns.from(queriedStatic), selection.fetched.regulars), - selection.subSelections); ++ | (selection.queried != null ? HAS_QUERIED_MASK : 0) ++ | (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK : 0); } public void serialize(ColumnFilter selection, DataOutputPlus out, int version) throws IOException { - selection = maybeUpdateForBackwardCompatility(selection, version); - out.writeByte(makeHeaderByte(selection)); - if (version >= MessagingService.VERSION_3014 && selection.isFetchAll) + if (version >= MessagingService.VERSION_3014 && selection.fetchAllRegulars) { Columns.serializer.serialize(selection.fetched.statics, out); Columns.serializer.serialize(selection.fetched.regulars, out); @@@ -618,7 -502,7 +662,7 @@@ if (hasSubSelections) { subSelections = TreeMultimap.create(Comparator.<ColumnIdentifier>naturalOrder(), Comparator.<ColumnSubselection>naturalOrder()); -- int size = (int)in.readUnsignedVInt(); ++ int size = (int) in.readUnsignedVInt(); for (int i = 0; i < size; i++) { ColumnSubselection subSel = ColumnSubselection.serializer.deserialize(in, version, metadata); @@@ -626,29 -510,25 +670,14 @@@ } } - // See CASSANDRA-15833 - if (version <= MessagingService.VERSION_3014 && isFetchAll) - // Since nodes with and without CASSANDRA-10657 are not distinguishable by messaging version, we need to - // check whether there are any nodes running pre CASSANDRA-10657 Cassandra and apply conversion to the - // column filter so that it is interpreted in the same way as on the nodes without CASSANDRA-10657. - // - // See the comment in {@link ColumnFilter#selection(CFMetaData, PartitionColumns)} - if (isFetchAll && queried != null && Gossiper.instance.isAnyNodeOn30()) - { - logger.trace("Deserialized column filter will be automatically converted to query all columns because 3.0 nodes are present in the cluster"); -- queried = null; - - // Same concern than in serialize/serializedSize: we should be wary of the change in meaning for isFetchAll. - // If we get a filter with isFetchAll from 3.0/3.x, it actually expects all static columns to be fetched, - // make sure we do that (note that if queried == null, that's already what we do). - // Note that here again this will make us do a bit more work that necessary, namely we'll _query_ all - // statics even though we only care about _fetching_ them all, but that's a minor inefficiency, so fine - // during upgrade. - if (version <= MessagingService.VERSION_30 && isFetchAll && queried != null) - queried = new RegularAndStaticColumns(metadata.staticColumns(), queried.regulars); - } -- -- return new ColumnFilter(isFetchAll, fetched, queried, subSelections); ++ return new ColumnFilter(isFetchAll, isFetchAll && shouldFetchAllStatics(), fetched, isFetchAll && shouldQueriedBeNull() ? null : queried, subSelections); } public long serializedSize(ColumnFilter selection, int version) { - selection = maybeUpdateForBackwardCompatility(selection, version); - long size = 1; // header byte - if (version >= MessagingService.VERSION_3014 && selection.isFetchAll) + if (version >= MessagingService.VERSION_3014 && selection.fetchAllRegulars) { size += Columns.serializer.serializedSize(selection.fetched.statics); size += Columns.serializer.serializedSize(selection.fetched.regulars); @@@ -671,4 -551,4 +700,4 @@@ return size; } } --} ++} diff --cc src/java/org/apache/cassandra/gms/Gossiper.java index 1129c29,819078d..7acaec9 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@@ -160,38 -157,7 +160,54 @@@ public class Gossiper implements IFailu private volatile long lastProcessedMessageAt = System.currentTimeMillis(); - //This property and anything that checks it should be removed in 5.0 - private boolean haveMajorVersion3Nodes = true; - private static FastThreadLocal<Boolean> isGossipStage = new FastThreadLocal<>(); ++ /** ++ * This property is initially set to {@code true} which means that we have no information about the other nodes. ++ * Once all nodes are on at least this node version, it becomes {@code false}, which means that we are not ++ * upgrading from the previous version (major, minor). ++ * ++ * This property and anything that checks it should be removed in 5.0 ++ */ ++ private volatile boolean upgradeInProgressPossible = true; + - final Supplier<ExpiringMemoizingSupplier.ReturnValue<Boolean>> haveMajorVersion3NodesSupplier = () -> ++ final Supplier<ExpiringMemoizingSupplier.ReturnValue<CassandraVersion>> upgradeFromVersionSupplier = () -> + { - //Once there are no prior version nodes we don't need to keep rechecking - if (!haveMajorVersion3Nodes) - return new ExpiringMemoizingSupplier.Memoized<>(false); ++ // Once there are no prior version nodes we don't need to keep rechecking ++ if (!upgradeInProgressPossible) ++ return new ExpiringMemoizingSupplier.Memoized<>(null); + + Iterable<InetAddressAndPort> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers()); - CassandraVersion referenceVersion = null; + ++ CassandraVersion minVersion = SystemKeyspace.CURRENT_VERSION.familyLowerBound.get(); ++ boolean allHostsHaveKnownVersion = true; + for (InetAddressAndPort host : allHosts) + { + CassandraVersion version = getReleaseVersion(host); + + //Raced with changes to gossip state, wait until next iteration + if (version == null) - return new ExpiringMemoizingSupplier.NotMemoized(true); ++ allHostsHaveKnownVersion = false; ++ else if (version.compareTo(minVersion) < 0) ++ minVersion = version; ++ } + - if (referenceVersion == null) - referenceVersion = version; ++ if (minVersion.compareTo(SystemKeyspace.CURRENT_VERSION.familyLowerBound.get()) < 0) ++ return new ExpiringMemoizingSupplier.Memoized<>(minVersion); + - if (version.major < 4) - return new ExpiringMemoizingSupplier.Memoized<>(true); - } ++ if (!allHostsHaveKnownVersion) ++ return new ExpiringMemoizingSupplier.NotMemoized<>(minVersion); + - haveMajorVersion3Nodes = false; - return new ExpiringMemoizingSupplier.Memoized(false); ++ upgradeInProgressPossible = false; ++ return new ExpiringMemoizingSupplier.Memoized<>(null); + }; + - private Supplier<Boolean> haveMajorVersion3NodesMemoized = ExpiringMemoizingSupplier.memoizeWithExpiration(haveMajorVersion3NodesSupplier, 1, TimeUnit.MINUTES); ++ private final Supplier<CassandraVersion> upgradeFromVersionMemoized = ExpiringMemoizingSupplier.memoizeWithExpiration(upgradeFromVersionSupplier, 1, TimeUnit.MINUTES); ++ ++ @VisibleForTesting ++ public void expireUpgradeFromVersion() ++ { ++ upgradeInProgressPossible = true; ++ ((ExpiringMemoizingSupplier<CassandraVersion>) upgradeFromVersionMemoized).expire(); ++ } private static final boolean disableThreadValidation = Boolean.getBoolean(Props.DISABLE_THREAD_VALIDATION); @@@ -2119,56 -1827,11 +2135,74 @@@ logger.info("No gossip backlog; proceeding"); } - @VisibleForTesting - public void stopShutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException + /** + * Blockingly wait for all live nodes to agree on the current schema version. + * + * @param maxWait maximum time to wait for schema agreement + * @param unit TimeUnit of maxWait + * @return true if agreement was reached, false if not + */ + public boolean waitForSchemaAgreement(long maxWait, TimeUnit unit, BooleanSupplier abortCondition) { - stop(); - ExecutorUtils.shutdownAndWait(timeout, unit, executor); + int waited = 0; + int toWait = 50; + + Set<InetAddressAndPort> members = getLiveTokenOwners(); + + while (true) + { + if (nodesAgreeOnSchema(members)) + return true; + + if (waited >= unit.toMillis(maxWait) || abortCondition.getAsBoolean()) + return false; + + Uninterruptibles.sleepUninterruptibly(toWait, TimeUnit.MILLISECONDS); + waited += toWait; + toWait = Math.min(1000, toWait * 2); + } + } + - public boolean haveMajorVersion3Nodes() ++ /** ++ * Returns {@code false} only if the information about the version of each node in the cluster is available and ++ * ALL the nodes are on 4.0+ (regardless of the patch version). ++ */ ++ public boolean hasMajorVersion3Nodes() + { - return haveMajorVersion3NodesMemoized.get(); ++ return isUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_4_0) || // this is quite obvious ++ // however if we discovered only nodes at current version so far (in particular only this node), ++ // but still there are nodes with unkonwn version, we also want to report that the cluster may have nodes at 3.x ++ upgradeInProgressPossible && !isUpgradingFromVersionLowerThan(SystemKeyspace.CURRENT_VERSION); ++ } ++ ++ /** ++ * Returns {@code true} if there are nodes on version lower than the provided version (only major / minor counts) ++ */ ++ public boolean isUpgradingFromVersionLowerThan(CassandraVersion referenceVersion) { ++ CassandraVersion v = upgradeFromVersionMemoized.get(); ++ if (SystemKeyspace.NULL_VERSION.equals(v) && scheduledGossipTask == null) ++ return false; ++ else ++ return v != null && v.compareTo(referenceVersion.familyLowerBound.get()) < 0; + } + + private boolean nodesAgreeOnSchema(Collection<InetAddressAndPort> nodes) + { + UUID expectedVersion = null; + + for (InetAddressAndPort node : nodes) + { + EndpointState state = getEndpointStateForEndpoint(node); + UUID remoteVersion = state.getSchemaVersion(); + + if (null == expectedVersion) + expectedVersion = remoteVersion; + + if (null == expectedVersion || !expectedVersion.equals(remoteVersion)) + return false; + } + + return true; } @VisibleForTesting diff --cc src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java index c62818a,eb2226b..7e8d8bc --- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java +++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java @@@ -210,28 -192,17 +210,28 @@@ public final class SystemDistributedKey processSilent(fmtQuery); } - public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, Collection<Range<Token>> ranges, Iterable<InetAddress> endpoints) + public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, CommonRange commonRange) { - String coordinator = FBUtilities.getBroadcastAddress().getHostAddress(); - Set<String> participants = Sets.newHashSet(coordinator); + //Don't record repair history if an upgrade is in progress as version 3 nodes generates errors + //due to schema differences - boolean includeNewColumns = !Gossiper.instance.haveMajorVersion3Nodes(); ++ boolean includeNewColumns = !Gossiper.instance.hasMajorVersion3Nodes(); + + InetAddressAndPort coordinator = FBUtilities.getBroadcastAddressAndPort(); + Set<String> participants = Sets.newHashSet(); + Set<String> participants_v2 = Sets.newHashSet(); - for (InetAddress endpoint : endpoints) - participants.add(endpoint.getHostAddress()); + for (InetAddressAndPort endpoint : commonRange.endpoints) + { + participants.add(endpoint.getHostAddress(false)); + participants_v2.add(endpoint.getHostAddressAndPort()); + } String query = + "INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, parent_id, range_begin, range_end, coordinator, coordinator_port, participants, participants_v2, status, started_at) " + + "VALUES ( '%s', '%s', %s, %s, '%s', '%s', '%s', %d, { '%s' }, { '%s' }, '%s', toTimestamp(now()))"; + String queryWithoutNewColumns = "INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, parent_id, range_begin, range_end, coordinator, participants, status, started_at) " + - "VALUES ( '%s', '%s', %s, %s, '%s', '%s', '%s', { '%s' }, '%s', toTimestamp(now()))"; + "VALUES ( '%s', '%s', %s, %s, '%s', '%s', '%s', { '%s' }, '%s', toTimestamp(now()))"; for (String cfname : cfnames) { diff --cc src/java/org/apache/cassandra/tracing/TraceKeyspace.java index 8c6d8c8,0d7c4f1..c2e74d8 --- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java +++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java @@@ -114,16 -105,14 +114,16 @@@ public final class TraceKeyspac int ttl) { PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(Sessions, sessionId); - builder.row() - .ttl(ttl) - .add("client", client) - .add("coordinator", FBUtilities.getBroadcastAddress()) - .add("request", request) - .add("started_at", new Date(startedAt)) - .add("command", command) - .appendAll("parameters", parameters); + Row.SimpleBuilder rb = builder.row(); + rb.ttl(ttl) + .add("client", client) + .add("coordinator", FBUtilities.getBroadcastAddressAndPort().address); - if (!Gossiper.instance.haveMajorVersion3Nodes()) ++ if (!Gossiper.instance.hasMajorVersion3Nodes()) + rb.add("coordinator_port", FBUtilities.getBroadcastAddressAndPort().port); + rb.add("request", request) + .add("started_at", new Date(startedAt)) + .add("command", command) + .appendAll("parameters", parameters); return builder.buildAsMutation(); } @@@ -144,10 -133,8 +144,10 @@@ .ttl(ttl); rowBuilder.add("activity", message) - .add("source", FBUtilities.getBroadcastAddress()) - .add("thread", threadName); + .add("source", FBUtilities.getBroadcastAddressAndPort().address); - if (!Gossiper.instance.haveMajorVersion3Nodes()) ++ if (!Gossiper.instance.hasMajorVersion3Nodes()) + rowBuilder.add("source_port", FBUtilities.getBroadcastAddressAndPort().port); + rowBuilder.add("thread", threadName); if (elapsed >= 0) rowBuilder.add("source_elapsed", elapsed); diff --cc src/java/org/apache/cassandra/utils/CassandraVersion.java index b3dca96,bf9fe6a..8638f0e --- a/src/java/org/apache/cassandra/utils/CassandraVersion.java +++ b/src/java/org/apache/cassandra/utils/CassandraVersion.java @@@ -18,13 -18,10 +18,15 @@@ package org.apache.cassandra.utils; import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; ++import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; -import com.google.common.base.Objects; +import com.google.common.annotations.VisibleForTesting; ++import com.google.common.base.Suppliers; import org.apache.commons.lang3.StringUtils; /** @@@ -36,21 -33,19 +38,26 @@@ public class CassandraVersion implements Comparable<CassandraVersion> { /** - * note: 3rd group matches to words but only allows number and checked after regexp test. + * note: 3rd/4th groups matches to words but only allows number and checked after regexp test. * this is because 3rd and the last can be identical. **/ - private static final String VERSION_REGEXP = "(\\d+)\\.(\\d+)(?:\\.(\\w+))?(\\-[.\\w]+)?([.+][.\\w]+)?"; - private static final Pattern PATTERN_WHITESPACE = Pattern.compile("\\w+"); + private static final String VERSION_REGEXP = "(\\d+)\\.(\\d+)(?:\\.(\\w+))?(?:\\.(\\w+))?(\\-[-.\\w]+)?([.+][.\\w]+)?"; + private static final Pattern PATTERN_WORDS = Pattern.compile("\\w+"); + @VisibleForTesting + static final int NO_HOTFIX = -1; - private static final Pattern pattern = Pattern.compile(VERSION_REGEXP); - private static final Pattern SNAPSHOT = Pattern.compile("-SNAPSHOT"); + private static final Pattern PATTERN = Pattern.compile(VERSION_REGEXP); + ++ public static final CassandraVersion CASSANDRA_4_0 = new CassandraVersion("4.0").familyLowerBound.get(); ++ public static final CassandraVersion CASSANDRA_3_4 = new CassandraVersion("3.4").familyLowerBound.get(); + public final int major; public final int minor; public final int patch; + public final int hotfix; + ++ public final Supplier<CassandraVersion> familyLowerBound = Suppliers.memoize(this::getFamilyLowerBound); + private final String[] preRelease; private final String[] build; @@@ -97,6 -81,6 +104,13 @@@ } } ++ private CassandraVersion getFamilyLowerBound() ++ { ++ return patch == 0 && hotfix == NO_HOTFIX && preRelease != null && preRelease.length == 0 && build == null ++ ? this ++ : new CassandraVersion(major, minor, 0, NO_HOTFIX, new String[0], null); ++ } ++ private static String[] parseIdentifiers(String version, String str) { // Drop initial - or + diff --cc src/java/org/apache/cassandra/utils/ExpiringMemoizingSupplier.java index 0280541,0000000..1736ae2 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/utils/ExpiringMemoizingSupplier.java +++ b/src/java/org/apache/cassandra/utils/ExpiringMemoizingSupplier.java @@@ -1,130 -1,0 +1,137 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + + +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + ++import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * An implementation similar to Guava's Suppliers.memoizeWithExpiration(Supplier) + * but allowing for memoization to be skipped. + * + * See CASSANDRA-16148 + */ +public class ExpiringMemoizingSupplier<T> implements Supplier<T> +{ + final Supplier<ReturnValue<T>> delegate; + final long durationNanos; + transient volatile T value; + // The special value 0 means "not yet initialized". + transient volatile long expirationNanos; + + public static <T> Supplier<T> memoizeWithExpiration(Supplier<ReturnValue<T>> delegate, long duration, TimeUnit unit) + { + return new ExpiringMemoizingSupplier<>(delegate, duration, unit); + } + + ExpiringMemoizingSupplier(Supplier<ReturnValue<T>> delegate, long duration, TimeUnit unit) { + this.delegate = Preconditions.checkNotNull(delegate); + this.durationNanos = unit.toNanos(duration); + Preconditions.checkArgument(duration > 0); + } + + @Override + public T get() { + // Another variant of Double Checked Locking. + // + // We use two volatile reads. We could reduce this to one by + // putting our fields into a holder class, but (at least on x86) + // the extra memory consumption and indirection are more + // expensive than the extra volatile reads. + long nanos = this.expirationNanos; + long now = System.nanoTime(); + if (nanos == 0L || now - nanos >= 0L) { + synchronized(this) { + if (nanos == this.expirationNanos) { + ReturnValue<T> t = this.delegate.get(); + if (t.canMemoize()) + this.value = t.value(); + else + return t.value(); + + nanos = now + this.durationNanos; + this.expirationNanos = nanos == 0L ? 1L : nanos; + return t.value(); + } + } + } + return this.value; + } + ++ @VisibleForTesting ++ public void expire() ++ { ++ this.expirationNanos = 0; ++ } ++ + @Override + public String toString() { + // This is a little strange if the unit the user provided was not NANOS, + // but we don't want to store the unit just for toString + return "Suppliers.memoizeWithExpiration(" + delegate + ", " + durationNanos + ", NANOS)"; + } + + private static final long serialVersionUID = 0; + + public static abstract class ReturnValue<T> + { + protected final T value; + + ReturnValue(T value){ + this.value = value; + } + + abstract boolean canMemoize(); + + public T value() + { + return value; + } + } + + public static class Memoized<T> extends ReturnValue<T> + { + public Memoized(T value) + { + super(value); + } + + public boolean canMemoize() + { + return true; + } + } + + public static class NotMemoized<T> extends ReturnValue<T> + { + public NotMemoized(T value) + { + super(value); + } + + public boolean canMemoize() + { + return false; + } + } +} diff --cc test/distributed/org/apache/cassandra/distributed/test/ReadDigestConsistencyTest.java index 0000000,8071eea..05e705b mode 000000,100644..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ReadDigestConsistencyTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ReadDigestConsistencyTest.java @@@ -1,0 -1,113 +1,146 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.distributed.test; + + import java.util.Arrays; + import java.util.UUID; + + import org.junit.Assert; + import org.junit.Test; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; + + import org.apache.cassandra.distributed.Cluster; + import org.apache.cassandra.distributed.api.ConsistencyLevel; + import org.apache.cassandra.distributed.api.ICoordinator; ++import org.apache.cassandra.exceptions.SyntaxException; ++import org.apache.cassandra.utils.Throwables; + + public class ReadDigestConsistencyTest extends TestBaseImpl + { ++ private final static Logger logger = LoggerFactory.getLogger(ReadDigestConsistencyTest.class); ++ + public static final String TABLE_NAME = "tbl"; + public static final String CREATE_TABLE = String.format("CREATE TABLE %s.%s (" + + " k int, " + + " c int, " + + " s1 int static, " + + " s2 set<int> static, " + + " v1 int, " + + " v2 set<int>, " + + " PRIMARY KEY (k, c))", KEYSPACE, TABLE_NAME); + + public static final String INSERT = String.format("INSERT INTO %s.%s (k, c, s1, s2, v1, v2) VALUES (?, ?, ?, ?, ?, ?)", KEYSPACE, TABLE_NAME); + + + public static final String SELECT_TRACE = "SELECT activity FROM system_traces.events where session_id = ? and source = ? ALLOW FILTERING;"; + + @Test + public void testDigestConsistency() throws Exception + { + try (Cluster cluster = init(builder().withNodes(2).start())) + { + cluster.schemaChange(CREATE_TABLE); + insertData(cluster.coordinator(1)); + testDigestConsistency(cluster.coordinator(1)); + testDigestConsistency(cluster.coordinator(2)); + } + } + + public static void checkTraceForDigestMismatch(ICoordinator coordinator, String query, Object... boundValues) + { + UUID sessionId = UUID.randomUUID(); - coordinator.executeWithTracing(sessionId, query, ConsistencyLevel.ALL, boundValues); ++ try ++ { ++ coordinator.executeWithTracing(sessionId, query, ConsistencyLevel.ALL, boundValues); ++ } ++ catch (RuntimeException ex) ++ { ++ if (Throwables.isCausedBy(ex, t -> t.getClass().getName().equals(SyntaxException.class.getName()))) ++ { ++ if (coordinator.instance().getReleaseVersionString().startsWith("3.") && query.contains("[")) ++ { ++ logger.warn("Query {} is not supported on node {} version {}", ++ query, ++ coordinator.instance().broadcastAddress().getAddress().getHostAddress(), ++ coordinator.instance().getReleaseVersionString()); ++ ++ // we can forgive SyntaxException for C* < 4.0 if the query contains collection element selection ++ return; ++ } ++ } ++ logger.error("Failing for coordinator {} and query {}", coordinator.instance().getReleaseVersionString(), query); ++ throw ex; ++ } + Object[][] results = coordinator.execute(SELECT_TRACE, + ConsistencyLevel.ALL, + sessionId, + coordinator.instance().broadcastAddress().getAddress()); + for (Object[] result : results) + { + String activity = (String) result[0]; + Assert.assertFalse(String.format("Found Digest Mismatch while executing query: %s with bound values %s on %s/%s", + query, + Arrays.toString(boundValues), + coordinator.instance().broadcastAddress(), + coordinator.instance().getReleaseVersionString()), + activity.toLowerCase().contains("mismatch for key")); + } + } + + public static void insertData(ICoordinator coordinator) + { + coordinator.execute(String.format("INSERT INTO %s.%s (k, c, s1, s2, v1, v2) VALUES (1, 1, 2, {1, 2, 3, 4, 5}, 3, {6, 7, 8, 9, 10})", KEYSPACE, TABLE_NAME), ConsistencyLevel.ALL); + coordinator.execute(String.format("INSERT INTO %s.%s (k, c, s1, s2, v1, v2) VALUES (1, 2, 3, {2, 3, 4, 5, 6}, 4, {7, 8, 9, 10, 11})", KEYSPACE, TABLE_NAME), ConsistencyLevel.ALL); + } + + public static void testDigestConsistency(ICoordinator coordinator) + { + String queryPattern = "SELECT %s FROM %s.%s WHERE %s"; + String[] columnss1 = { + "k, c", + "*", + "v1", + "v2", + "v1, s1", - "v1, s2" ++ "v1, s2", ++ "v2[3]", ++ "v2[2..4]", ++ "v1, s2[7]", ++ "v1, s2[6..8]" + }; + + String[] columnss2 = { + "s1", - "s2" ++ "s2", ++ "s2[7]", ++ "s2[6..8]" + }; + + for (String columns : columnss1) + { + checkTraceForDigestMismatch(coordinator, String.format(queryPattern, columns, KEYSPACE, TABLE_NAME, "k = 1")); + checkTraceForDigestMismatch(coordinator, String.format(queryPattern, columns, KEYSPACE, TABLE_NAME, "k = 1 AND c = 2")); + } + for (String columns : columnss2) + { + checkTraceForDigestMismatch(coordinator, String.format(queryPattern, columns, KEYSPACE, TABLE_NAME, "k = 1")); + } + } + } diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadTest.java index 6ee9b0a,d908cd5..f9a3542 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadTest.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadTest.java @@@ -18,15 -18,16 +18,16 @@@ package org.apache.cassandra.distributed.upgrade; - import java.util.UUID; - - import org.junit.Assert; import org.junit.Test; - import org.apache.cassandra.distributed.UpgradeableCluster; - import org.apache.cassandra.distributed.api.ConsistencyLevel; - import org.apache.cassandra.distributed.shared.DistributedTestBase; -import org.apache.cassandra.distributed.api.Feature; + import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.shared.Versions; + import org.apache.cassandra.gms.Gossiper; ++import org.apache.cassandra.utils.CassandraVersion; + + import static org.apache.cassandra.distributed.test.ReadDigestConsistencyTest.CREATE_TABLE; + import static org.apache.cassandra.distributed.test.ReadDigestConsistencyTest.insertData; + import static org.apache.cassandra.distributed.test.ReadDigestConsistencyTest.testDigestConsistency; public class MixedModeReadTest extends UpgradeTestBase { @@@ -51,27 -37,28 +37,29 @@@ new TestCase() .nodes(2) .nodesToUpgrade(1) - .upgrade(Versions.Major.v30, Versions.Major.v3X) - .withConfig(config -> config.with(Feature.GOSSIP, Feature.NETWORK)) + .upgrade(Versions.Major.v30, Versions.Major.v4) + .upgrade(Versions.Major.v3X, Versions.Major.v4) .setup(cluster -> { cluster.schemaChange(CREATE_TABLE); - cluster.coordinator(1).execute(INSERT, ConsistencyLevel.ALL, 1, "static", "foo", "bar", "baz"); - cluster.coordinator(1).execute(INSERT, ConsistencyLevel.ALL, 1, "static", "fi", "biz", "baz"); - cluster.coordinator(1).execute(INSERT, ConsistencyLevel.ALL, 1, "static", "fo", "boz", "baz"); - - // baseline to show no digest mismatches before upgrade - checkTraceForDigestMismatch(cluster, 1, SELECT_C1, 1); - checkTraceForDigestMismatch(cluster, 2, SELECT_C1, 1); + insertData(cluster.coordinator(1)); + testDigestConsistency(cluster.coordinator(1)); + testDigestConsistency(cluster.coordinator(2)); }) - .runAfterNodeUpgrade((cluster, node) -> { - if (node != 1) - return; // shouldn't happen but guard for future test changes + .runAfterClusterUpgrade(cluster -> { + // we need to let gossip settle or the test will fail + int attempts = 1; + //noinspection Convert2MethodRef - while (!((IInvokableInstance) (cluster.get(1))).callOnInstance(() -> Gossiper.instance.isAnyNodeOn30())) ++ while (!((IInvokableInstance) (cluster.get(1))).callOnInstance(() -> Gossiper.instance.isUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_4_0.familyLowerBound.get()) && ++ !Gossiper.instance.isUpgradingFromVersionLowerThan(new CassandraVersion(("3.0")).familyLowerBound.get()))) + { + if (attempts++ > 30) - throw new RuntimeException("Gossiper.instance.isAnyNodeOn30() continually returns false despite expecting to be true"); ++ throw new RuntimeException("Gossiper.instance.haveMajorVersion3Nodes() continually returns false despite expecting to be true"); + Thread.sleep(1000); + } // should not cause a disgest mismatch in mixed mode - checkTraceForDigestMismatch(cluster, 1, SELECT_C1, 1); - checkTraceForDigestMismatch(cluster, 2, SELECT_C1, 1); - checkTraceForDigestMismatch(cluster, 1, SELECT_C1_S1_ROW, 1, "foo"); - checkTraceForDigestMismatch(cluster, 2, SELECT_C1_S1_ROW, 1, "fi"); + testDigestConsistency(cluster.coordinator(1)); + testDigestConsistency(cluster.coordinator(2)); }) .run(); } diff --cc test/unit/org/apache/cassandra/Util.java index ca5ec63,fa24167..8e487a0 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@@ -79,14 -67,11 +79,15 @@@ import org.apache.cassandra.service.Sto import org.apache.cassandra.service.pager.PagingState; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.ByteBufferUtil; ++import org.apache.cassandra.utils.CassandraVersion; import org.apache.cassandra.utils.CounterId; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.FilterFactory; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; public class Util @@@ -747,59 -712,4 +748,75 @@@ PagingState.RowMark mark = PagingState.RowMark.create(metadata, row, protocolVersion); return new PagingState(pk, mark, 10, remainingInPartition); } + + public static void assertRCEquals(ReplicaCollection<?> a, ReplicaCollection<?> b) + { + assertTrue(a + " not equal to " + b, Iterables.elementsEqual(a, b)); + } + + public static void assertNotRCEquals(ReplicaCollection<?> a, ReplicaCollection<?> b) + { + assertFalse(a + " equal to " + b, Iterables.elementsEqual(a, b)); + } + + /** + * Makes sure that the sstables on disk are the same ones as the cfs live sstables (that they have the same generation) + */ + public static void assertOnDiskState(ColumnFamilyStore cfs, int expectedSSTableCount) + { + LifecycleTransaction.waitForDeletions(); + assertEquals(expectedSSTableCount, cfs.getLiveSSTables().size()); + Set<Integer> liveGenerations = cfs.getLiveSSTables().stream().map(sstable -> sstable.descriptor.generation).collect(Collectors.toSet()); + int fileCount = 0; + for (File f : cfs.getDirectories().getCFDirectories()) + { + for (File sst : f.listFiles()) + { + if (sst.getName().contains("Data")) + { + Descriptor d = Descriptor.fromFilename(sst.getAbsolutePath()); + assertTrue(liveGenerations.contains(d.generation)); + fileCount++; + } + } + } + assertEquals(expectedSSTableCount, fileCount); + } + + /** + * Disable bloom filter on all sstables of given table + */ + public static void disableBloomFilter(ColumnFamilyStore cfs) + { + Collection<SSTableReader> sstables = cfs.getLiveSSTables(); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) + { + for (SSTableReader sstable : sstables) + { + sstable = sstable.cloneAndReplace(FilterFactory.AlwaysPresent); + txn.update(sstable, true); + txn.checkpoint(); + } + txn.finish(); + } + + for (SSTableReader reader : cfs.getLiveSSTables()) + assertEquals(FilterFactory.AlwaysPresent, reader.getBloomFilter()); + } ++ ++ /** ++ * Setups Gossiper to mimic the upgrade behaviour when {@link Gossiper#isUpgradingFromVersionLowerThan(CassandraVersion)} ++ * or {@link Gossiper#hasMajorVersion3Nodes()} is called. ++ */ ++ public static void setUpgradeFromVersion(String version) ++ { ++ int v = Optional.ofNullable(Gossiper.instance.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddressAndPort())) ++ .map(ep -> ep.getApplicationState(ApplicationState.RELEASE_VERSION)) ++ .map(rv -> rv.version) ++ .orElse(0); ++ ++ Gossiper.instance.addLocalApplicationState(ApplicationState.RELEASE_VERSION, ++ VersionedValue.unsafeMakeVersionedValue(version, v + 1)); ++ Gossiper.instance.expireUpgradeFromVersion(); ++ } } diff --cc test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java index edfe57a,e86071a..59770b8 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java @@@ -19,23 -19,23 +19,69 @@@ package org.apache.cassandra.cql3.validation.operations; import java.nio.ByteBuffer; ++import java.util.Arrays; ++import java.util.Collection; import java.util.List; ++import org.junit.Before; ++import org.junit.BeforeClass; import org.junit.Test; ++import org.junit.runner.RunWith; ++import org.junit.runners.Parameterized; - import org.apache.cassandra.schema.SchemaConstants; -import org.apache.cassandra.config.SchemaConstants; ++import org.apache.cassandra.Util; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.cql3.Duration; ++import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.SyntaxException; ++import org.apache.cassandra.gms.Gossiper; ++import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.SchemaKeyspace; ++import org.apache.cassandra.utils.CassandraVersion; import static java.lang.String.format; import static org.junit.Assert.assertEquals; ++import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; ++@RunWith(Parameterized.class) public class InsertUpdateIfConditionTest extends CQLTester { ++ @Parameterized.Parameter(0) ++ public String clusterMinVersion; ++ ++ @Parameterized.Parameter(1) ++ public Runnable assertion; ++ ++ @Parameterized.Parameters(name = "{index}: clusterMinVersion={0}") ++ public static Collection<Object[]> data() ++ { ++ return Arrays.asList(new Object[]{ "3.0", (Runnable) () -> { ++ assertTrue(Gossiper.instance.isUpgradingFromVersionLowerThan(new CassandraVersion("3.11"))); ++ } }, ++ new Object[]{ "3.11", (Runnable) () -> { ++ assertTrue(Gossiper.instance.isUpgradingFromVersionLowerThan(new CassandraVersion("4.0"))); ++ assertFalse(Gossiper.instance.isUpgradingFromVersionLowerThan(new CassandraVersion("3.11"))); ++ } }, ++ new Object[]{ SystemKeyspace.CURRENT_VERSION.toString(), (Runnable) () -> { ++ assertFalse(Gossiper.instance.isUpgradingFromVersionLowerThan(new CassandraVersion("4.0"))); ++ } }); ++ } ++ ++ @BeforeClass ++ public static void beforeClass() ++ { ++ Gossiper.instance.maybeInitializeLocalState(0); ++ } ++ ++ @Before ++ public void before() ++ { ++ Util.setUpgradeFromVersion(clusterMinVersion); ++ assertion.run(); ++ } ++ /** * Migrated from cql_tests.py:TestCQL.cas_simple_test() */ @@@ -1020,7 -1032,7 +1066,7 @@@ } /** -- * Test expanded functionality from CASSANDRA-6839, ++ * Test expanded functionality from CASSANDRA-6839, * migrated from cql_tests.py:TestCQL.expanded_list_item_conditional_test() */ @Test diff --cc test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java index 396a2e9,1ddf039..a96aa57 --- a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java +++ b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java @@@ -18,12 -18,23 +18,20 @@@ package org.apache.cassandra.db.filter; - import static org.junit.Assert.assertEquals; - import static org.junit.Assert.assertFalse; - import static org.junit.Assert.assertNull; - import static org.junit.Assert.assertTrue; + import java.io.IOException; + import java.util.Arrays; + import java.util.Collection; + import java.util.function.Consumer; -import com.google.common.base.Throwables; + import org.junit.Assert; + import org.junit.Before; + import org.junit.BeforeClass; + import org.junit.Test; + import org.junit.runner.RunWith; + import org.junit.runners.Parameterized; + -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.PartitionColumns; ++import org.apache.cassandra.Util; +import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.SetType; import org.apache.cassandra.db.rows.CellPath; @@@ -32,285 -44,404 +41,452 @@@ import org.apache.cassandra.io.util.Dat import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.ByteBufferUtil; - import org.junit.Test; ++import org.apache.cassandra.utils.Throwables; - import org.junit.Assert; + import static org.junit.Assert.assertEquals; + @RunWith(Parameterized.class) public class ColumnFilterTest { private static final ColumnFilter.Serializer serializer = new ColumnFilter.Serializer(); - private final CFMetaData metadata = CFMetaData.Builder.create("ks", "table") - .withPartitioner(Murmur3Partitioner.instance) - .addPartitionKey("pk", Int32Type.instance) - .addClusteringColumn("ck", Int32Type.instance) - .addStaticColumn("s1", Int32Type.instance) - .addStaticColumn("s2", SetType.getInstance(Int32Type.instance, true)) - .addRegularColumn("v1", Int32Type.instance) - .addRegularColumn("v2", SetType.getInstance(Int32Type.instance, true)) - .build(); - - private final ColumnDefinition s1 = metadata.getColumnDefinition(ByteBufferUtil.bytes("s1")); - private final ColumnDefinition s2 = metadata.getColumnDefinition(ByteBufferUtil.bytes("s2")); - private final ColumnDefinition v1 = metadata.getColumnDefinition(ByteBufferUtil.bytes("v1")); - private final ColumnDefinition v2 = metadata.getColumnDefinition(ByteBufferUtil.bytes("v2")); ++ @Parameterized.Parameter ++ public String clusterMinVersion; ++ ++ private final TableMetadata metadata = TableMetadata.builder("ks", "table") ++ .partitioner(Murmur3Partitioner.instance) ++ .addPartitionKeyColumn("pk", Int32Type.instance) ++ .addClusteringColumn("ck", Int32Type.instance) ++ .addStaticColumn("s1", Int32Type.instance) ++ .addStaticColumn("s2", SetType.getInstance(Int32Type.instance, true)) ++ .addRegularColumn("v1", Int32Type.instance) ++ .addRegularColumn("v2", SetType.getInstance(Int32Type.instance, true)) ++ .build(); ++ ++ private final ColumnMetadata s1 = metadata.getColumn(ByteBufferUtil.bytes("s1")); ++ private final ColumnMetadata s2 = metadata.getColumn(ByteBufferUtil.bytes("s2")); ++ private final ColumnMetadata v1 = metadata.getColumn(ByteBufferUtil.bytes("v1")); ++ private final ColumnMetadata v2 = metadata.getColumn(ByteBufferUtil.bytes("v2")); + private final CellPath path0 = CellPath.create(ByteBufferUtil.bytes(0)); + private final CellPath path1 = CellPath.create(ByteBufferUtil.bytes(1)); + private final CellPath path2 = CellPath.create(ByteBufferUtil.bytes(2)); + private final CellPath path3 = CellPath.create(ByteBufferUtil.bytes(3)); + private final CellPath path4 = CellPath.create(ByteBufferUtil.bytes(4)); + - @Parameterized.Parameter - public boolean anyNodeOn30; + - @Parameterized.Parameters(name = "{index}: anyNodeOn30={0}") ++ @Parameterized.Parameters(name = "{index}: clusterMinVersion={0}") + public static Collection<Object[]> data() + { - return Arrays.asList(new Object[]{ true }, new Object[]{ false }); ++ return Arrays.asList(new Object[]{ "3.0" }, new Object[]{ "3.11" }, new Object[]{ "4.0" }); + } + + @BeforeClass + public static void beforeClass() + { - DatabaseDescriptor.clientInitialization(); ++ Gossiper.instance.maybeInitializeLocalState(0); + } + + @Before + public void before() + { - Gossiper.instance.setAnyNodeOn30(anyNodeOn30); ++ Util.setUpgradeFromVersion(clusterMinVersion); + } + + // Select all + + @Test + public void testSelectAll() + { + Consumer<ColumnFilter> check = filter -> { + testRoundTrips(filter); + assertEquals("*/*", filter.toString()); + assertFetchedQueried(true, true, filter, v1, v2, s1, s2); + assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path3, path4); + assertCellFetchedQueried(true, true, filter, s2, path0, path1, path2, path3, path4); + }; + + check.accept(ColumnFilter.all(metadata)); - check.accept(ColumnFilter.allColumnsBuilder(metadata).build()); ++ check.accept(ColumnFilter.allRegularColumnsBuilder(metadata).build()); + } + + // Selections + @Test - public void testColumnFilterSerialisationRoundTrip() throws Exception + public void testSelectNothing() { - TableMetadata metadata = TableMetadata.builder("ks", "table") - .partitioner(Murmur3Partitioner.instance) - .addPartitionKeyColumn("pk", Int32Type.instance) - .addClusteringColumn("ck", Int32Type.instance) - .addRegularColumn("v1", Int32Type.instance) - .addRegularColumn("v2", Int32Type.instance) - .addRegularColumn("v3", Int32Type.instance) - .build(); - - ColumnMetadata v1 = metadata.getColumn(ByteBufferUtil.bytes("v1")); - - ColumnFilter columnFilter; - - columnFilter = ColumnFilter.all(metadata); - testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_30), metadata, MessagingService.VERSION_30); - testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_3014), metadata, MessagingService.VERSION_3014); - testRoundTrip(columnFilter, metadata, MessagingService.VERSION_40); - - testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_30); - testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_3014); - testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_40); - - columnFilter = ColumnFilter.selection(metadata, metadata.regularAndStaticColumns().without(v1)); - testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_30), metadata, MessagingService.VERSION_30); - testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_3014), metadata, MessagingService.VERSION_3014); - testRoundTrip(columnFilter, metadata, MessagingService.VERSION_40); - - // Table with static column - metadata = TableMetadata.builder("ks", "table") - .partitioner(Murmur3Partitioner.instance) - .addPartitionKeyColumn("pk", Int32Type.instance) - .addClusteringColumn("ck", Int32Type.instance) - .addStaticColumn("s1", Int32Type.instance) - .addRegularColumn("v1", Int32Type.instance) - .addRegularColumn("v2", Int32Type.instance) - .addRegularColumn("v3", Int32Type.instance) - .build(); - - v1 = metadata.getColumn(ByteBufferUtil.bytes("v1")); - ColumnMetadata s1 = metadata.getColumn(ByteBufferUtil.bytes("s1")); - - columnFilter = ColumnFilter.all(metadata); - testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_30), metadata, MessagingService.VERSION_30); - testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_3014), metadata, MessagingService.VERSION_3014); - testRoundTrip(columnFilter, metadata, MessagingService.VERSION_40); - - testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_30); - testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_3014); - testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_40); - - testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1).without(s1)), metadata, MessagingService.VERSION_30); - testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1).without(s1)), metadata, MessagingService.VERSION_3014); - testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1).without(s1)), metadata, MessagingService.VERSION_40); - - columnFilter = ColumnFilter.selection(metadata, metadata.regularAndStaticColumns().without(v1)); - testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_30), metadata, MessagingService.VERSION_30); - testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_3014), metadata, MessagingService.VERSION_3014); - testRoundTrip(columnFilter, metadata, MessagingService.VERSION_40); - - columnFilter = ColumnFilter.selection(metadata, metadata.regularAndStaticColumns().without(v1).without(s1)); - testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_30), metadata, MessagingService.VERSION_30); - testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_3014), metadata, MessagingService.VERSION_3014); - testRoundTrip(columnFilter, metadata, MessagingService.VERSION_40); + Consumer<ColumnFilter> check = filter -> { + testRoundTrips(filter); + assertEquals("[]", filter.toString()); + assertFetchedQueried(false, false, filter, v1, v2, s1, s2); + assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4); + assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4); + }; + - check.accept(ColumnFilter.selection(PartitionColumns.NONE)); ++ check.accept(ColumnFilter.selection(RegularAndStaticColumns.NONE)); + check.accept(ColumnFilter.selectionBuilder().build()); } @Test - public void testColumnFilterConstruction() + public void testSelectSimpleColumn() { - // all regular column - TableMetadata metadata = TableMetadata.builder("ks", "table") - .partitioner(Murmur3Partitioner.instance) - .addPartitionKeyColumn("pk", Int32Type.instance) - .addClusteringColumn("ck", Int32Type.instance) - .addRegularColumn("v1", Int32Type.instance) - .addRegularColumn("v2", Int32Type.instance) - .addRegularColumn("v3", Int32Type.instance) - .build(); - ColumnFilter columnFilter = ColumnFilter.all(metadata); - assertTrue(columnFilter.fetchAllRegulars); - assertEquals(metadata.regularAndStaticColumns(), columnFilter.fetched); - assertNull(columnFilter.queried); - assertEquals("*", columnFilter.toString()); - - RegularAndStaticColumns queried = RegularAndStaticColumns.builder() - .add(metadata.getColumn(ByteBufferUtil.bytes("v1"))).build(); - columnFilter = ColumnFilter.selection(queried); - assertFalse(columnFilter.fetchAllRegulars); - assertEquals(queried, columnFilter.fetched); - assertEquals(queried, columnFilter.queried); - assertEquals("v1", columnFilter.toString()); - - // with static column - metadata = TableMetadata.builder("ks", "table") - .partitioner(Murmur3Partitioner.instance) - .addPartitionKeyColumn("pk", Int32Type.instance) - .addClusteringColumn("ck", Int32Type.instance) - .addStaticColumn("sc1", Int32Type.instance) - .addStaticColumn("sc2", Int32Type.instance) - .addRegularColumn("v1", Int32Type.instance) - .build(); - - columnFilter = ColumnFilter.all(metadata); - assertTrue(columnFilter.fetchAllRegulars); - assertEquals(metadata.regularAndStaticColumns(), columnFilter.fetched); - assertNull(columnFilter.queried); - assertEquals("*", columnFilter.toString()); - - queried = RegularAndStaticColumns.builder() - .add(metadata.getColumn(ByteBufferUtil.bytes("v1"))).build(); - columnFilter = ColumnFilter.selection(metadata, queried); - assertEquals("v1", columnFilter.toString()); - - // only static - metadata = TableMetadata.builder("ks", "table") - .partitioner(Murmur3Partitioner.instance) - .addPartitionKeyColumn("pk", Int32Type.instance) - .addClusteringColumn("ck", Int32Type.instance) - .addStaticColumn("sc", Int32Type.instance) - .build(); - - columnFilter = ColumnFilter.all(metadata); - assertTrue(columnFilter.fetchAllRegulars); - assertEquals(metadata.regularAndStaticColumns(), columnFilter.fetched); - assertNull(columnFilter.queried); - assertEquals("*", columnFilter.toString()); - - // with collection type - metadata = TableMetadata.builder("ks", "table") - .partitioner(Murmur3Partitioner.instance) - .addPartitionKeyColumn("pk", Int32Type.instance) - .addClusteringColumn("ck", Int32Type.instance) - .addRegularColumn("v1", Int32Type.instance) - .addRegularColumn("set", SetType.getInstance(Int32Type.instance, true)) - .build(); - - columnFilter = ColumnFilter.all(metadata); - assertTrue(columnFilter.fetchAllRegulars); - assertEquals(metadata.regularAndStaticColumns(), columnFilter.fetched); - assertNull(columnFilter.queried); - assertEquals("*", columnFilter.toString()); - - columnFilter = ColumnFilter.selectionBuilder().add(metadata.getColumn(ByteBufferUtil.bytes("v1"))) - .select(metadata.getColumn(ByteBufferUtil.bytes("set")), CellPath.create(ByteBufferUtil.bytes(1))) - .build(); - assertEquals("set[1], v1", columnFilter.toString()); + Consumer<ColumnFilter> check = filter -> { + testRoundTrips(filter); + assertEquals("[v1]", filter.toString()); + assertFetchedQueried(true, true, filter, v1); + assertFetchedQueried(false, false, filter, v2, s1, s2); + assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4); + assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4); + }; + - check.accept(ColumnFilter.selection(PartitionColumns.builder().add(v1).build())); ++ check.accept(ColumnFilter.selection(RegularAndStaticColumns.builder().add(v1).build())); + check.accept(ColumnFilter.selectionBuilder().add(v1).build()); } - private static void testRoundTrip(ColumnFilter columnFilter, TableMetadata metadata, int version) throws Exception + @Test + public void testSelectComplexColumn() { - testRoundTrip(columnFilter, columnFilter, metadata, version); + Consumer<ColumnFilter> check = filter -> { + testRoundTrips(filter); + assertEquals("[v2]", filter.toString()); + assertFetchedQueried(true, true, filter, v2); + assertFetchedQueried(false, false, filter, v1, s1, s2); + assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path3, path4); + assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4); + }; + - check.accept(ColumnFilter.selection(PartitionColumns.builder().add(v2).build())); ++ check.accept(ColumnFilter.selection(RegularAndStaticColumns.builder().add(v2).build())); + check.accept(ColumnFilter.selectionBuilder().add(v2).build()); } - private static void testRoundTrip(ColumnFilter columnFilter, ColumnFilter expected, TableMetadata metadata, int version) throws Exception + @Test + public void testSelectStaticColumn() { - DataOutputBuffer output = new DataOutputBuffer(); - serializer.serialize(columnFilter, output, version); - Assert.assertEquals(serializer.serializedSize(columnFilter, version), output.position()); - DataInputPlus input = new DataInputBuffer(output.buffer(), false); - ColumnFilter deserialized = serializer.deserialize(input, version, metadata); - Assert.assertEquals(deserialized, expected); + Consumer<ColumnFilter> check = filter -> { + testRoundTrips(filter); + assertEquals("[s1]", filter.toString()); + assertFetchedQueried(true, true, filter, s1); + assertFetchedQueried(false, false, filter, v1, v2, s2); + assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4); + assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4); + }; + - check.accept(ColumnFilter.selection(PartitionColumns.builder().add(s1).build())); ++ check.accept(ColumnFilter.selection(RegularAndStaticColumns.builder().add(s1).build())); + check.accept(ColumnFilter.selectionBuilder().add(s1).build()); } - /** - * Tests whether a filter fetches and/or queries columns and cells. - */ @Test - public void testFetchedQueried() + public void testSelectStaticComplexColumn() { - TableMetadata metadata = TableMetadata.builder("ks", "table") - .partitioner(Murmur3Partitioner.instance) - .addPartitionKeyColumn("k", Int32Type.instance) - .addRegularColumn("simple", Int32Type.instance) - .addRegularColumn("complex", SetType.getInstance(Int32Type.instance, true)) - .build(); - - ColumnMetadata simple = metadata.getColumn(ByteBufferUtil.bytes("simple")); - ColumnMetadata complex = metadata.getColumn(ByteBufferUtil.bytes("complex")); - CellPath path1 = CellPath.create(ByteBufferUtil.bytes(1)); - CellPath path2 = CellPath.create(ByteBufferUtil.bytes(2)); - ColumnFilter filter; - - // select only the simple column, without table metadata - filter = ColumnFilter.selection(RegularAndStaticColumns.builder().add(simple).build()); - assertFetchedQueried(true, true, filter, simple); - assertFetchedQueried(false, false, filter, complex); - assertFetchedQueried(false, false, filter, complex, path1); - assertFetchedQueried(false, false, filter, complex, path2); - - // select only the complex column, without table metadata - filter = ColumnFilter.selection(RegularAndStaticColumns.builder().add(complex).build()); - assertFetchedQueried(false, false, filter, simple); - assertFetchedQueried(true, true, filter, complex); - assertFetchedQueried(true, true, filter, complex, path1); - assertFetchedQueried(true, true, filter, complex, path2); - - // select both the simple and complex columns, without table metadata - filter = ColumnFilter.selection(RegularAndStaticColumns.builder().add(simple).add(complex).build()); - assertFetchedQueried(true, true, filter, simple); - assertFetchedQueried(true, true, filter, complex); - assertFetchedQueried(true, true, filter, complex, path1); - assertFetchedQueried(true, true, filter, complex, path2); - - // select only the simple column, with table metadata - filter = ColumnFilter.selection(metadata, RegularAndStaticColumns.builder().add(simple).build()); - assertFetchedQueried(true, true, filter, simple); - assertFetchedQueried(true, false, filter, complex); - assertFetchedQueried(true, false, filter, complex, path1); - assertFetchedQueried(true, false, filter, complex, path2); - - // select only the complex column, with table metadata - filter = ColumnFilter.selection(metadata, RegularAndStaticColumns.builder().add(complex).build()); - assertFetchedQueried(true, false, filter, simple); - assertFetchedQueried(true, true, filter, complex); - assertFetchedQueried(true, true, filter, complex, path1); - assertFetchedQueried(true, true, filter, complex, path2); - - // select both the simple and complex columns, with table metadata - filter = ColumnFilter.selection(metadata, RegularAndStaticColumns.builder().add(simple).add(complex).build()); - assertFetchedQueried(true, true, filter, simple); - assertFetchedQueried(true, true, filter, complex); - assertFetchedQueried(true, true, filter, complex, path1); - assertFetchedQueried(true, true, filter, complex, path2); - - // select only the simple column, with selection builder - filter = ColumnFilter.selectionBuilder().add(simple).build(); - assertFetchedQueried(true, true, filter, simple); - assertFetchedQueried(false, false, filter, complex); - assertFetchedQueried(false, false, filter, complex, path1); - assertFetchedQueried(false, false, filter, complex, path2); - - // select only a cell of the complex column, with selection builder - filter = ColumnFilter.selectionBuilder().select(complex, path1).build(); - assertFetchedQueried(false, false, filter, simple); - assertFetchedQueried(true, true, filter, complex); - assertFetchedQueried(true, true, filter, complex, path1); - assertFetchedQueried(true, false, filter, complex, path2); - - // select both the simple column and a cell of the complex column, with selection builder - filter = ColumnFilter.selectionBuilder().add(simple).select(complex, path1).build(); - assertFetchedQueried(true, true, filter, simple); - assertFetchedQueried(true, true, filter, complex); - assertFetchedQueried(true, true, filter, complex, path1); - assertFetchedQueried(true, false, filter, complex, path2); + Consumer<ColumnFilter> check = filter -> { + testRoundTrips(filter); + assertEquals("[s2]", filter.toString()); + assertFetchedQueried(true, true, filter, s2); + assertFetchedQueried(false, false, filter, v1, v2, s1); + assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4); + assertCellFetchedQueried(true, true, filter, s2, path0, path1, path2, path3, path4); + }; + - check.accept(ColumnFilter.selection(PartitionColumns.builder().add(s2).build())); ++ check.accept(ColumnFilter.selection(RegularAndStaticColumns.builder().add(s2).build())); + check.accept(ColumnFilter.selectionBuilder().add(s2).build()); } - private static void assertFetchedQueried(boolean expectedFetched, - boolean expectedQueried, - ColumnFilter filter, - ColumnMetadata column) + @Test + public void testSelectColumns() { - assert !expectedQueried || expectedFetched; - boolean actualFetched = filter.fetches(column); - assertEquals(expectedFetched, actualFetched); - assertEquals(expectedQueried, actualFetched && filter.fetchedColumnIsQueried(column)); + Consumer<ColumnFilter> check = filter -> { + testRoundTrips(filter); + assertEquals("[s1, s2, v1, v2]", filter.toString()); + assertFetchedQueried(true, true, filter, v1, v2, s1, s2); + assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path3, path4); + assertCellFetchedQueried(true, true, filter, s2, path0, path1, path2, path3, path4); + }; + - check.accept(ColumnFilter.selection(PartitionColumns.builder().add(v1).add(v2).add(s1).add(s2).build())); ++ check.accept(ColumnFilter.selection(RegularAndStaticColumns.builder().add(v1).add(v2).add(s1).add(s2).build())); + check.accept(ColumnFilter.selectionBuilder().add(v1).add(v2).add(s1).add(s2).build()); } + @Test + public void testSelectIndividualCells() + { + ColumnFilter filter = ColumnFilter.selectionBuilder().select(v2, path1).select(v2, path3).build(); + testRoundTrips(filter); + assertEquals("[v2[1], v2[3]]", filter.toString()); + assertFetchedQueried(true, true, filter, v2); + assertFetchedQueried(false, false, filter, v1, s1, s2); + assertCellFetchedQueried(true, true, filter, v2, path1, path3); + assertCellFetchedQueried(false, false, filter, v2, path0, path2, path4); + assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4); + } + + @Test + public void testSelectIndividualCellsFromStatic() + { + ColumnFilter filter = ColumnFilter.selectionBuilder().select(s2, path1).select(s2, path3).build(); + testRoundTrips(filter); + assertEquals("[s2[1], s2[3]]", filter.toString()); + assertFetchedQueried(true, true, filter, s2); + assertFetchedQueried(false, false, filter, v1, v2, s1); + assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4); + assertCellFetchedQueried(true, true, filter, s2, path1, path3); + assertCellFetchedQueried(false, false, filter, s2, path0, path2, path4); + } + + @Test + public void testSelectCellSlice() + { + ColumnFilter filter = ColumnFilter.selectionBuilder().slice(v2, path1, path3).build(); + testRoundTrips(filter); + assertEquals("[v2[1:3]]", filter.toString()); + assertFetchedQueried(true, true, filter, v2); + assertFetchedQueried(false, false, filter, v1, s1, s2); + assertCellFetchedQueried(true, true, filter, v2, path1, path2, path3); + assertCellFetchedQueried(false, false, filter, v2, path0, path4); + assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4); + } + + @Test + public void testSelectCellSliceFromStatic() + { + ColumnFilter filter = ColumnFilter.selectionBuilder().slice(s2, path1, path3).build(); + testRoundTrips(filter); + assertEquals("[s2[1:3]]", filter.toString()); + assertFetchedQueried(true, true, filter, s2); + assertFetchedQueried(false, false, filter, v1, v2, s1); + assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4); + assertCellFetchedQueried(true, true, filter, s2, path1, path2, path3); + assertCellFetchedQueried(false, false, filter, s2, path0, path4); + } + + @Test + public void testSelectColumnsWithCellsAndSlices() + { + ColumnFilter filter = ColumnFilter.selectionBuilder() + .add(v1) + .add(s1) + .slice(v2, path0, path2) + .select(v2, path4) + .select(s2, path0) + .slice(s2, path2, path4) + .build(); + testRoundTrips(filter); + assertEquals("[s1, s2[0], s2[2:4], v1, v2[0:2], v2[4]]", filter.toString()); + assertFetchedQueried(true, true, filter, v1, v2, s1, s2); + assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path4); + assertCellFetchedQueried(false, false, filter, v2, path3); + assertCellFetchedQueried(true, true, filter, s2, path0, path2, path3, path4); + assertCellFetchedQueried(false, false, filter, s2, path1); + } + + // select with metadata + + @Test + public void testSelectSimpleColumnWithMetadata() + { + Consumer<ColumnFilter> check = filter -> { + testRoundTrips(filter); + assertFetchedQueried(true, true, filter, v1); - if (anyNodeOn30) ++ if ("3.0".equals(clusterMinVersion)) + { + assertEquals("*/*", filter.toString()); + assertFetchedQueried(true, true, filter, s1, s2, v2); + assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path3, path4); + assertCellFetchedQueried(true, true, filter, s2, path0, path1, path2, path3, path4); + } - else ++ else if ("3.11".equals(clusterMinVersion)) + { + assertEquals("*/[v1]", filter.toString()); + assertFetchedQueried(true, false, filter, s1, s2, v2); + assertCellFetchedQueried(true, false, filter, v2, path0, path1, path2, path3, path4); + assertCellFetchedQueried(true, false, filter, s2, path0, path1, path2, path3, path4); + } ++ else ++ { ++ assertEquals("<all regulars>/[v1]", filter.toString()); ++ assertFetchedQueried(true, false, filter, v2); ++ assertFetchedQueried(false, false, filter, s1, s2); ++ assertCellFetchedQueried(true, false, filter, v2, path0, path1, path2, path3, path4); ++ assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4); ++ } + }; + - check.accept(ColumnFilter.selection(metadata, PartitionColumns.builder().add(v1).build())); - check.accept(ColumnFilter.allColumnsBuilder(metadata).add(v1).build()); ++ check.accept(ColumnFilter.selection(metadata, RegularAndStaticColumns.builder().add(v1).build())); ++ check.accept(ColumnFilter.allRegularColumnsBuilder(metadata).add(v1).build()); + } + + @Test + public void testSelectStaticColumnWithMetadata() + { + Consumer<ColumnFilter> check = filter -> { + testRoundTrips(filter); + assertFetchedQueried(true, true, filter, s1); - if (anyNodeOn30) ++ if ("3.0".equals(clusterMinVersion)) + { + assertEquals("*/*", filter.toString()); + assertFetchedQueried(true, true, filter, v1, v2, s2); + assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path3, path4); + assertCellFetchedQueried(true, true, filter, s2, path0, path1, path2, path3, path4); + } - else ++ else if ("3.11".equals(clusterMinVersion)) + { + assertEquals("*/[s1]", filter.toString()); + assertFetchedQueried(true, false, filter, v1, v2, s2); + assertCellFetchedQueried(true, false, filter, v2, path0, path1, path2, path3, path4); + assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4); + } ++ else ++ { ++ assertEquals("<all regulars>+[s1]/[s1]", filter.toString()); ++ assertFetchedQueried(true, false, filter, v1, v2); ++ assertFetchedQueried(false, false, filter, s2); ++ assertCellFetchedQueried(true, false, filter, v2, path0, path1, path2, path3, path4); ++ assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4); ++ } + }; + - check.accept(ColumnFilter.selection(metadata, PartitionColumns.builder().add(s1).build())); - check.accept(ColumnFilter.allColumnsBuilder(metadata).add(s1).build()); ++ check.accept(ColumnFilter.selection(metadata, RegularAndStaticColumns.builder().add(s1).build())); ++ check.accept(ColumnFilter.allRegularColumnsBuilder(metadata).add(s1).build()); + } + + @Test + public void testSelectCellWithMetadata() + { - ColumnFilter filter = ColumnFilter.allColumnsBuilder(metadata).select(v2, path1).build(); ++ ColumnFilter filter = ColumnFilter.allRegularColumnsBuilder(metadata).select(v2, path1).build(); + testRoundTrips(filter); + assertFetchedQueried(true, true, filter, v2); - if (anyNodeOn30) ++ if ("3.0".equals(clusterMinVersion)) + { + assertEquals("*/*", filter.toString()); + assertFetchedQueried(true, true, filter, s1, s2, v1); + assertCellFetchedQueried(true, true, filter, v2, path1); + assertCellFetchedQueried(true, false, filter, v2, path0, path2, path3, path4); + assertCellFetchedQueried(true, true, filter, s2, path0, path1, path2, path3, path4); + } - else ++ else if ("3.11".equals(clusterMinVersion)) + { + assertEquals("*/[v2[1]]", filter.toString()); + assertFetchedQueried(true, false, filter, s1, s2, v1); + assertCellFetchedQueried(true, true, filter, v2, path1); + assertCellFetchedQueried(true, false, filter, v2, path0, path2, path3, path4); + assertCellFetchedQueried(true, false, filter, s2, path0, path1, path2, path3, path4); + } ++ else ++ { ++ assertEquals("<all regulars>/[v2[1]]", filter.toString()); ++ assertFetchedQueried(true, false, filter, v1); ++ assertFetchedQueried(false, false, filter, s1, s2); ++ assertCellFetchedQueried(true, true, filter, v2, path1); ++ assertCellFetchedQueried(true, false, filter, v2, path0, path2, path3, path4); ++ assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4); ++ } + } + + @Test + public void testSelectStaticColumnCellWithMetadata() + { - ColumnFilter filter = ColumnFilter.allColumnsBuilder(metadata).select(s2, path1).build(); ++ ColumnFilter filter = ColumnFilter.allRegularColumnsBuilder(metadata).select(s2, path1).build(); + testRoundTrips(filter); + assertFetchedQueried(true, true, filter, s2); - if (anyNodeOn30) ++ if ("3.0".equals(clusterMinVersion)) + { + assertEquals("*/*", filter.toString()); + assertFetchedQueried(true, true, filter, v1, v2, s1); + assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path3, path4); + assertCellFetchedQueried(true, true, filter, s2, path1); - assertCellFetchedQueried(true, false, filter, s2, path0, path2, path3, path4); ++ assertCellFetchedQueried(true, false, filter, s2, path0, path2, path3, path4); // TODO ??? + } - else ++ else if ("3.11".equals(clusterMinVersion)) + { + assertEquals("*/[s2[1]]", filter.toString()); + assertFetchedQueried(true, false, filter, v1, v2, s1); + assertCellFetchedQueried(true, false, filter, v2, path0, path1, path2, path3, path4); + assertCellFetchedQueried(true, true, filter, s2, path1); + assertCellFetchedQueried(true, false, filter, s2, path0, path2, path3, path4); + } ++ else ++ { ++ assertEquals("<all regulars>+[s2[1]]/[s2[1]]", filter.toString()); ++ assertFetchedQueried(true, false, filter, v1, v2); ++ assertFetchedQueried(false, false, filter, s1); ++ assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4); ++ assertCellFetchedQueried(true, true, filter, s2, path1); ++ assertCellFetchedQueried(false, false, filter, s2, path0, path2, path3, path4); ++ } + } + + private void testRoundTrips(ColumnFilter cf) + { + testRoundTrip(cf, MessagingService.VERSION_30); + testRoundTrip(cf, MessagingService.VERSION_3014); ++ testRoundTrip(cf, MessagingService.VERSION_40); + } + + private void testRoundTrip(ColumnFilter columnFilter, int version) + { + try + { + DataOutputBuffer output = new DataOutputBuffer(); + serializer.serialize(columnFilter, output, version); + Assert.assertEquals(serializer.serializedSize(columnFilter, version), output.position()); + DataInputPlus input = new DataInputBuffer(output.buffer(), false); + ColumnFilter deserialized = serializer.deserialize(input, version, metadata); - Assert.assertEquals(deserialized, columnFilter); ++ ++ if (!clusterMinVersion.equals("4.0") || version != MessagingService.VERSION_30 || !columnFilter.fetchAllRegulars) ++ { ++ Assert.assertEquals(deserialized, columnFilter); ++ } ++ else ++ { ++ Assert.assertEquals(deserialized.fetched, metadata.regularAndStaticColumns()); ++ } + } + catch (IOException e) + { - throw Throwables.propagate(e); ++ throw Throwables.cleaned(e); + } + } + ++ private static void assertFetchedQueried(boolean expectedFetched, boolean expectedQueried, ColumnFilter filter, - ColumnMetadata column, - CellPath path) - ColumnDefinition... columns) ++ ColumnMetadata... columns) + { - for (ColumnDefinition column : columns) ++ for (ColumnMetadata column : columns) + { - assertEquals(String.format("Expected fetches(%s) to be %s", column.name, expectedFetched), ++ assertEquals(String.format("Expected fetches(%s) to be %s", column, expectedFetched), + expectedFetched, filter.fetches(column)); + if (expectedFetched) - assertEquals(String.format("Expected fetchedColumnIsQueried(%s) to be %s", column.name, expectedQueried), ++ assertEquals(String.format("Expected fetchedColumnIsQueried(%s) to be %s", column, expectedQueried), + expectedQueried, filter.fetchedColumnIsQueried(column)); + } + } + + private static void assertCellFetchedQueried(boolean expectedFetched, + boolean expectedQueried, + ColumnFilter filter, - ColumnDefinition column, ++ ColumnMetadata column, + CellPath... paths) { - assert !expectedQueried || expectedFetched; - boolean actualFetched = filter.fetches(column); - assertEquals(expectedFetched, actualFetched); - assertEquals(expectedQueried, actualFetched && filter.fetchedCellIsQueried(column, path)); + ColumnFilter.Tester tester = filter.newTester(column); + + for (CellPath path : paths) + { + int p = ByteBufferUtil.toInt(path.get(0)); + if (expectedFetched) - assertEquals(String.format("Expected fetchedCellIsQueried(%s:%s) to be %s", column.name, p, expectedQueried), ++ assertEquals(String.format("Expected fetchedCellIsQueried(%s:%s) to be %s", column, p, expectedQueried), + expectedQueried, filter.fetchedCellIsQueried(column, path)); + + if (tester != null) + { - assertEquals(String.format("Expected tester.fetches(%s:%s) to be %s", column.name, p, expectedFetched), ++ assertEquals(String.format("Expected tester.fetches(%s:%s) to be %s", column, p, expectedFetched), + expectedFetched, tester.fetches(path)); + if (expectedFetched) - assertEquals(String.format("Expected tester.fetchedCellIsQueried(%s:%s) to be %s", column.name, p, expectedQueried), ++ assertEquals(String.format("Expected tester.fetchedCellIsQueried(%s:%s) to be %s", column, p, expectedQueried), + expectedQueried, tester.fetchedCellIsQueried(path)); + } + } } -} +} diff --cc test/unit/org/apache/cassandra/gms/GossiperTest.java index 1b17a27,b6b3ffb..74f3dbb --- a/test/unit/org/apache/cassandra/gms/GossiperTest.java +++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java @@@ -38,13 -37,13 +38,15 @@@ import org.apache.cassandra.db.commitlo import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.RandomPartitioner; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.SeedProvider; import org.apache.cassandra.locator.TokenMetadata; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.StorageService; ++import org.apache.cassandra.utils.CassandraVersion; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; ++import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class GossiperTest @@@ -80,44 -74,7 +82,47 @@@ } @Test - public void testHaveVersion3Nodes() throws Exception - public void testLargeGenerationJump() throws UnknownHostException ++ public void testHasVersion3Nodes() throws Exception + { ++ Gossiper.instance.expireUpgradeFromVersion(); ++ + VersionedValue.VersionedValueFactory factory = new VersionedValue.VersionedValueFactory(null); + EndpointState es = new EndpointState((HeartBeatState) null); + es.addApplicationState(ApplicationState.RELEASE_VERSION, factory.releaseVersion("4.0-SNAPSHOT")); + Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.1"), es); + Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.1")); + + + es = new EndpointState((HeartBeatState) null); + es.addApplicationState(ApplicationState.RELEASE_VERSION, factory.releaseVersion("3.11.3")); + Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.2"), es); + Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.2")); + - + es = new EndpointState((HeartBeatState) null); + es.addApplicationState(ApplicationState.RELEASE_VERSION, factory.releaseVersion("3.0.0")); + Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.3"), es); + Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.3")); + - - assertTrue(Gossiper.instance.haveMajorVersion3NodesSupplier.get().value()); - - Gossiper.instance.endpointStateMap.remove(InetAddressAndPort.getByName("127.0.0.2")); - Gossiper.instance.liveEndpoints.remove(InetAddressAndPort.getByName("127.0.0.2")); - - - assertTrue(Gossiper.instance.haveMajorVersion3NodesSupplier.get().value()); ++ assertFalse(Gossiper.instance.upgradeFromVersionSupplier.get().value().compareTo(new CassandraVersion("3.0")) < 0); ++ assertTrue(Gossiper.instance.upgradeFromVersionSupplier.get().value().compareTo(new CassandraVersion("3.1")) < 0); ++ assertTrue(Gossiper.instance.hasMajorVersion3Nodes()); + + Gossiper.instance.endpointStateMap.remove(InetAddressAndPort.getByName("127.0.0.3")); + Gossiper.instance.liveEndpoints.remove(InetAddressAndPort.getByName("127.0.0.3")); + - assertFalse(Gossiper.instance.haveMajorVersion3NodesSupplier.get().value()); ++ assertFalse(Gossiper.instance.upgradeFromVersionSupplier.get().value().compareTo(new CassandraVersion("3.0")) < 0); ++ assertFalse(Gossiper.instance.upgradeFromVersionSupplier.get().value().compareTo(new CassandraVersion("3.1")) < 0); ++ assertTrue(Gossiper.instance.upgradeFromVersionSupplier.get().value().compareTo(new CassandraVersion("3.12")) < 0); ++ assertTrue(Gossiper.instance.hasMajorVersion3Nodes()); ++ ++ Gossiper.instance.endpointStateMap.remove(InetAddressAndPort.getByName("127.0.0.2")); ++ Gossiper.instance.liveEndpoints.remove(InetAddressAndPort.getByName("127.0.0.2")); + ++ assertNull(Gossiper.instance.upgradeFromVersionSupplier.get().value()); + } + + @Test + public void testLargeGenerationJump() throws UnknownHostException, InterruptedException { Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2); try @@@ -235,126 -192,53 +240,126 @@@ } } + // Note: This test might fail if for some reason the node broadcast address is in 127.99.0.0/16 @Test - public void testSchemaVersionUpdate() throws UnknownHostException, InterruptedException + public void testReloadSeeds() throws UnknownHostException { - Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2); - MessagingService.instance().listen(); - Gossiper.instance.start(1); - InetAddress remoteHostAddress = hosts.get(1); - - EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress); - // Set to any 3.0 version - Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion("3.0.14")); + Gossiper gossiper = new Gossiper(false); + List<String> loadedList; + + // Initialize the seed list directly to a known set to start with + gossiper.seeds.clear(); + InetAddressAndPort addr = InetAddressAndPort.getByAddress(InetAddress.getByName("127.99.1.1")); + int nextSize = 4; + List<InetAddressAndPort> nextSeeds = new ArrayList<>(nextSize); + for (int i = 0; i < nextSize; i ++) + { + gossiper.seeds.add(addr); + nextSeeds.add(addr); + addr = InetAddressAndPort.getByAddress(InetAddresses.increment(addr.address)); + } + Assert.assertEquals(nextSize, gossiper.seeds.size()); + + // Add another unique address to the list + addr = InetAddressAndPort.getByAddress(InetAddresses.increment(addr.address)); + nextSeeds.add(addr); + nextSize++; + DatabaseDescriptor.setSeedProvider(new TestSeedProvider(nextSeeds)); + loadedList = gossiper.reloadSeeds(); + + // Check that the new entry was added + Assert.assertEquals(nextSize, loadedList.size()); + for (InetAddressAndPort a : nextSeeds) + assertTrue(loadedList.contains(a.toString())); + + // Check that the return value of the reloadSeeds matches the content of the getSeeds call + // and that they both match the internal contents of the Gossiper seeds list + Assert.assertEquals(loadedList.size(), gossiper.getSeeds().size()); + for (InetAddressAndPort a : gossiper.seeds) + { + assertTrue(loadedList.contains(a.toString())); + assertTrue(gossiper.getSeeds().contains(a.toString())); + } - Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState)); + // Add a duplicate of the last address to the seed provider list + int uniqueSize = nextSize; + nextSeeds.add(addr); + nextSize++; + DatabaseDescriptor.setSeedProvider(new TestSeedProvider(nextSeeds)); + loadedList = gossiper.reloadSeeds(); + + // Check that the number of seed nodes reported hasn't increased + Assert.assertEquals(uniqueSize, loadedList.size()); + for (InetAddressAndPort a : nextSeeds) + assertTrue(loadedList.contains(a.toString())); + + // Create a new list that has no overlaps with the previous list + addr = InetAddressAndPort.getByAddress(InetAddress.getByName("127.99.2.1")); + int disjointSize = 3; + List<InetAddressAndPort> disjointSeeds = new ArrayList<>(disjointSize); + for (int i = 0; i < disjointSize; i ++) + { + disjointSeeds.add(addr); + addr = InetAddressAndPort.getByAddress(InetAddresses.increment(addr.address)); + } + DatabaseDescriptor.setSeedProvider(new TestSeedProvider(disjointSeeds)); + loadedList = gossiper.reloadSeeds(); - // wait until the schema is set - VersionedValue schema = null; - for (int i = 0; i < 10; i++) + // Check that the list now contains exactly the new other list. + Assert.assertEquals(disjointSize, gossiper.getSeeds().size()); + Assert.assertEquals(disjointSize, loadedList.size()); + for (InetAddressAndPort a : disjointSeeds) { - EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0)); - schema = localState.getApplicationState(ApplicationState.SCHEMA); - if (schema != null) - break; - Thread.sleep(1000); + assertTrue(gossiper.getSeeds().contains(a.toString())); + assertTrue(loadedList.contains(a.toString())); } - // schema is set and equals to "alternative" version - assertTrue(schema != null); - assertEquals(schema.value, Schema.instance.getAltVersion().toString()); + // Set the seed node provider to return an empty list + DatabaseDescriptor.setSeedProvider(new TestSeedProvider(new ArrayList<InetAddressAndPort>())); + loadedList = gossiper.reloadSeeds(); + + // Check that the in memory seed node list was not modified + Assert.assertEquals(disjointSize, loadedList.size()); + for (InetAddressAndPort a : disjointSeeds) + assertTrue(loadedList.contains(a.toString())); + + // Change the seed provider to one that throws an unchecked exception + DatabaseDescriptor.setSeedProvider(new ErrorSeedProvider()); + loadedList = gossiper.reloadSeeds(); + + // Check for the expected null response from a reload error - Assert.assertNull(loadedList); ++ assertNull(loadedList); - // Upgrade remote host version to the latest one (3.11) - Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion()); + // Check that the in memory seed node list was not modified and the exception was caught + Assert.assertEquals(disjointSize, gossiper.getSeeds().size()); + for (InetAddressAndPort a : disjointSeeds) + assertTrue(gossiper.getSeeds().contains(a.toString())); + } - Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState)); + static class TestSeedProvider implements SeedProvider + { + private List<InetAddressAndPort> seeds; - // wait until the schema change - VersionedValue newSchema = null; - for (int i = 0; i < 10; i++) + TestSeedProvider(List<InetAddressAndPort> seeds) { - EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0)); - newSchema = localState.getApplicationState(ApplicationState.SCHEMA); - if (!schema.value.equals(newSchema.value)) - break; - Thread.sleep(1000); + this.seeds = seeds; } - // schema is changed and equals to real version - assertFalse(schema.value.equals(newSchema.value)); - assertEquals(newSchema.value, Schema.instance.getRealVersion().toString()); + @Override + public List<InetAddressAndPort> getSeeds() + { + return seeds; + } + } + + // A seed provider for testing which throws assertion errors when queried + static class ErrorSeedProvider implements SeedProvider + { + @Override + public List<InetAddressAndPort> getSeeds() + { + assert(false); + return new ArrayList<>(); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
