This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit b3ecbf38a3b9bd7dbcafb5caccacda4bc7d356a0 Merge: 781e486 d6beb01 Author: Sam Tunnicliffe <s...@beobal.com> AuthorDate: Thu Apr 16 18:29:05 2020 +0100 Merge branch 'cassandra-3.11' into trunk CHANGES.txt | 1 + src/java/org/apache/cassandra/config/Config.java | 6 + .../cassandra/config/DatabaseDescriptor.java | 10 + .../cassandra/db/SinglePartitionReadCommand.java | 83 +++--- .../apache/cassandra/repair/RepairRunnable.java | 76 +++++- .../cassandra/service/SnapshotVerbHandler.java | 67 +++++ .../org/apache/cassandra/service/StorageProxy.java | 18 ++ .../cassandra/service/StorageProxyMBean.java | 4 + .../cassandra/service/reads/DataResolver.java | 2 +- .../service/reads/repair/RepairedDataVerifier.java | 80 +++++- .../distributed/test/PreviewRepairTest.java | 106 +++++++- .../distributed/test/RepairDigestTrackingTest.java | 285 ++++++++++++++------- .../distributed/test/SimpleReadWriteTest.java | 101 ++++++++ .../reads/repair/RepairedDataVerifierTest.java | 4 +- 14 files changed, 697 insertions(+), 146 deletions(-) diff --cc CHANGES.txt index 8f19c2a,dbdb779..96eeed4 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,53 -1,9 +1,54 @@@ -3.11.7 +4.0-alpha4 + * Fix CQLSH to avoid arguments being evaluated (CASSANDRA-15660) + * Correct Visibility and Improve Safety of Methods in LatencyMetrics (CASSANDRA-15597) + * Allow cqlsh to run with Python2.7/Python3.6+ (CASSANDRA-15659,CASSANDRA-15573) + * Improve logging around incremental repair (CASSANDRA-15599) + * Do not check cdc_raw_directory filesystem space if CDC disabled (CASSANDRA-15688) + * Replace array iterators with get by index (CASSANDRA-15394) + * Minimize BTree iterator allocations (CASSANDRA-15389) + * Add client request size server metrics (CASSANDRA-15704) + * Add additional logging around FileUtils and compaction leftover cleanup (CASSANDRA-15705) + * Mark system_views/system_virtual_schema as non-alterable keyspaces in cqlsh (CASSANDRA-15711) + * Fail incremental repair if an old version sstable is involved (CASSANDRA-15612) + * Fix overflows on StreamingTombstoneHistogramBuilder produced by large deletion times (CASSANDRA-14773) + * Mark system_views/system_virtual_schema as system keyspaces in cqlsh (CASSANDRA-15706) + * Avoid unnecessary collection/iterator allocations during btree construction (CASSANDRA-15390) + * Repair history tables should have TTL and TWCS (CASSANDRA-12701) + * Fix cqlsh erroring out on Python 3.7 due to webbrowser module being absent (CASSANDRA-15572) + * Fix IMH#acquireCapacity() to return correct Outcome when endpoint reserve runs out (CASSANDRA-15607) + * Fix nodetool describering output (CASSANDRA-15682) + * Only track ideal CL failure when request CL met (CASSANDRA-15696) + * Fix flaky CoordinatorMessagingTest and docstring in OutboundSink and ConsistentSession (CASSANDRA-15672) + * Fix force compaction of wrapping ranges (CASSANDRA-15664) + * Expose repair streaming metrics (CASSANDRA-15656) + * Set now in seconds in the future for validation repairs (CASSANDRA-15655) + * Emit metric on preview repair failure (CASSANDRA-15654) + * Use more appropriate logging levels (CASSANDRA-15661) + * Fixed empty check in TrieMemIndex due to potential state inconsistency in ConcurrentSkipListMap (CASSANDRA-15526) + * Added UnleveledSSTables global and table level metric (CASSANDRA-15620) + * Added Virtual Table exposing Cassandra relevant system properties (CASSANDRA-15616, CASSANDRA-15643) + * Improve the algorithmic token allocation in case racks = RF (CASSANDRA-15600) + * Fix ConnectionTest.testAcquireReleaseOutbound (CASSANDRA-15308) + * Include finalized pending sstables in preview repair (CASSANDRA-15553) + * Reverted to the original behavior of CLUSTERING ORDER on CREATE TABLE (CASSANDRA-15271) + * Correct inaccurate logging message (CASSANDRA-15549) + * Unset GREP_OPTIONS (CASSANDRA-14487) + * Update to Python driver 3.21 for cqlsh (CASSANDRA-14872) + * Fix missing Keyspaces in cqlsh describe output (CASSANDRA-15576) + * Fix multi DC nodetool status output (CASSANDRA-15305) + * updateCoordinatorWriteLatencyTableMetric can produce misleading metrics (CASSANDRA-15569) + * Make cqlsh and cqlshlib Python 2 & 3 compatible (CASSANDRA-10190) + * Improve the description of nodetool listsnapshots command (CASSANDRA-14587) + * allow embedded cassandra launched from a one-jar or uno-jar (CASSANDRA-15494) + * Update hppc library to version 0.8.1 (CASSANDRA-12995) + * Limit the dependencies used by UDFs/UDAs (CASSANDRA-14737) + * Make native_transport_max_concurrent_requests_in_bytes updatable (CASSANDRA-15519) + * Cleanup and improvements to IndexInfo/ColumnIndex (CASSANDRA-15469) + * Potential Overflow in DatabaseDescriptor Functions That Convert Between KB/MB & Bytes (CASSANDRA-15470) +Merged from 3.11: * Allow sstableloader to use SSL on the native port (CASSANDRA-14904) Merged from 3.0: -======= -3.0.21 + * Don't skip sstables in slice queries based only on local min/max/deletion timestamp (CASSANDRA-15690) * Memtable memory allocations may deadlock (CASSANDRA-15367) * Run evictFromMembership in GossipStage (CASSANDRA-15592) Merged from 2.2: diff --cc src/java/org/apache/cassandra/config/Config.java index 2792694,7f28546..957662a --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@@ -415,69 -387,6 +415,75 @@@ public class Confi public volatile boolean back_pressure_enabled = false; public volatile ParameterizedClass back_pressure_strategy; + public RepairCommandPoolFullStrategy repair_command_pool_full_strategy = RepairCommandPoolFullStrategy.queue; + public int repair_command_pool_size = concurrent_validations; + + /** + * When a node first starts up it intially considers all other peers as DOWN and is disconnected from all of them. + * To be useful as a coordinator (and not introduce latency penalties on restart) this node must have successfully + * opened all three internode TCP connections (gossip, small, and large messages) before advertising to clients. + * Due to this, by default, Casssandra will prime these internode TCP connections and wait for all but a single + * node to be DOWN/disconnected in the local datacenter before offering itself as a coordinator, subject to a + * timeout. See CASSANDRA-13993 and CASSANDRA-14297 for more details. + * + * We provide two tunables to control this behavior as some users may want to block until all datacenters are + * available (global QUORUM/EACH_QUORUM), some users may not want to block at all (clients that already work + * around the problem), and some users may want to prime the connections but not delay startup. + * + * block_for_peers_timeout_in_secs: controls how long this node will wait to connect to peers. To completely disable + * any startup connectivity checks set this to -1. To trigger the internode connections but immediately continue + * startup, set this to to 0. The default is 10 seconds. + * + * block_for_peers_in_remote_dcs: controls if this node will consider remote datacenters to wait for. The default + * is to _not_ wait on remote datacenters. + */ + public int block_for_peers_timeout_in_secs = 10; + public boolean block_for_peers_in_remote_dcs = false; + + public volatile boolean automatic_sstable_upgrade = false; + public volatile int max_concurrent_automatic_sstable_upgrades = 1; + public boolean stream_entire_sstables = true; + + public volatile AuditLogOptions audit_logging_options = new AuditLogOptions(); + public volatile FullQueryLoggerOptions full_query_logging_options = new FullQueryLoggerOptions(); + + public CorruptedTombstoneStrategy corrupted_tombstone_strategy = CorruptedTombstoneStrategy.disabled; + + public volatile boolean diagnostic_events_enabled = false; + + /** + * flags for enabling tracking repaired state of data during reads + * separate flags for range & single partition reads as single partition reads are only tracked + * when CL > 1 and a digest mismatch occurs. Currently, range queries don't use digests so if + * enabled for range reads, all such reads will include repaired data tracking. As this adds + * some overhead, operators may wish to disable it whilst still enabling it for partition reads + */ + public volatile boolean repaired_data_tracking_for_range_reads_enabled = false; + public volatile boolean repaired_data_tracking_for_partition_reads_enabled = false; + /* If true, unconfirmed mismatches (those which cannot be considered conclusive proof of out of + * sync repaired data due to the presence of pending repair sessions, or unrepaired partition + * deletes) will increment a metric, distinct from confirmed mismatches. If false, unconfirmed + * mismatches are simply ignored by the coordinator. + * This is purely to allow operators to avoid potential signal:noise issues as these types of + * mismatches are considerably less actionable than their confirmed counterparts. Setting this + * to true only disables the incrementing of the counters when an unconfirmed mismatch is found + * and has no other effect on the collection or processing of the repaired data. + */ + public volatile boolean report_unconfirmed_repaired_data_mismatches = false; ++ /* ++ * If true, when a repaired data mismatch is detected at read time or during a preview repair, ++ * a snapshot request will be issued to each particpating replica. These are limited at the replica level ++ * so that only a single snapshot per-table per-day can be taken via this method. ++ */ ++ public volatile boolean snapshot_on_repaired_data_mismatch = false; + + /** + * number of seconds to set nowInSec into the future when performing validation previews against repaired data + * this (attempts) to prevent a race where validations on different machines are started on different sides of + * a tombstone being compacted away + */ + public volatile int validation_preview_purge_head_start_in_sec = 60 * 60; + /** * @deprecated migrate to {@link DatabaseDescriptor#isClientInitialized()} */ diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 4821653,1e2e1a1..85bfa88 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@@ -2887,124 -2585,6 +2887,134 @@@ public class DatabaseDescripto return backPressureStrategy; } + public static ConsistencyLevel getIdealConsistencyLevel() + { + return conf.ideal_consistency_level; + } + + public static void setIdealConsistencyLevel(ConsistencyLevel cl) + { + conf.ideal_consistency_level = cl; + } + + public static int getRepairCommandPoolSize() + { + return conf.repair_command_pool_size; + } + + public static Config.RepairCommandPoolFullStrategy getRepairCommandPoolFullStrategy() + { + return conf.repair_command_pool_full_strategy; + } + + public static FullQueryLoggerOptions getFullQueryLogOptions() + { + return conf.full_query_logging_options; + } + + public static boolean getBlockForPeersInRemoteDatacenters() + { + return conf.block_for_peers_in_remote_dcs; + } + + public static int getBlockForPeersTimeoutInSeconds() + { + return conf.block_for_peers_timeout_in_secs; + } + + public static boolean automaticSSTableUpgrade() + { + return conf.automatic_sstable_upgrade; + } + + public static void setAutomaticSSTableUpgradeEnabled(boolean enabled) + { + if (conf.automatic_sstable_upgrade != enabled) + logger.debug("Changing automatic_sstable_upgrade to {}", enabled); + conf.automatic_sstable_upgrade = enabled; + } + + public static int maxConcurrentAutoUpgradeTasks() + { + return conf.max_concurrent_automatic_sstable_upgrades; + } + + public static void setMaxConcurrentAutoUpgradeTasks(int value) + { + if (conf.max_concurrent_automatic_sstable_upgrades != value) + logger.debug("Changing max_concurrent_automatic_sstable_upgrades to {}", value); + validateMaxConcurrentAutoUpgradeTasksConf(value); + conf.max_concurrent_automatic_sstable_upgrades = value; + } + + private static void validateMaxConcurrentAutoUpgradeTasksConf(int value) + { + if (value < 0) + throw new ConfigurationException("max_concurrent_automatic_sstable_upgrades can't be negative"); + if (value > getConcurrentCompactors()) + logger.warn("max_concurrent_automatic_sstable_upgrades ({}) is larger than concurrent_compactors ({})", value, getConcurrentCompactors()); + } + + public static AuditLogOptions getAuditLoggingOptions() + { + return conf.audit_logging_options; + } + + public static void setAuditLoggingOptions(AuditLogOptions auditLoggingOptions) + { + conf.audit_logging_options = auditLoggingOptions; + } + + public static Config.CorruptedTombstoneStrategy getCorruptedTombstoneStrategy() + { + return conf.corrupted_tombstone_strategy; + } + + public static void setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy strategy) + { + conf.corrupted_tombstone_strategy = strategy; + } + + public static boolean getRepairedDataTrackingForRangeReadsEnabled() + { + return conf.repaired_data_tracking_for_range_reads_enabled; + } + + public static void setRepairedDataTrackingForRangeReadsEnabled(boolean enabled) + { + conf.repaired_data_tracking_for_range_reads_enabled = enabled; + } + + public static boolean getRepairedDataTrackingForPartitionReadsEnabled() + { + return conf.repaired_data_tracking_for_partition_reads_enabled; + } + + public static void setRepairedDataTrackingForPartitionReadsEnabled(boolean enabled) + { + conf.repaired_data_tracking_for_partition_reads_enabled = enabled; + } + ++ public static boolean snapshotOnRepairedDataMismatch() ++ { ++ return conf.snapshot_on_repaired_data_mismatch; ++ } ++ ++ public static void setSnapshotOnRepairedDataMismatch(boolean enabled) ++ { ++ conf.snapshot_on_repaired_data_mismatch = enabled; ++ } ++ + public static boolean reportUnconfirmedRepairedDataMismatches() + { + return conf.report_unconfirmed_repaired_data_mismatches; + } + + public static void reportUnconfirmedRepairedDataMismatches(boolean enabled) + { + conf.report_unconfirmed_repaired_data_mismatches = enabled; + } + public static boolean strictRuntimeChecks() { return strictRuntimeChecks; diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 4daad7d,3d6559b..fe440f4 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@@ -618,74 -725,64 +618,71 @@@ public class SinglePartitionReadComman * timestamp(tombstone) > maxTimestamp_s0 * since we necessarily have * timestamp(tombstone) <= maxTimestamp_s1 - * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination - * in one pass, and minimize the number of sstables for which we read a partition tombstone. - */ + * In other words, iterating in descending maxTimestamp order allow to do our mostRecentPartitionTombstone + * elimination in one pass, and minimize the number of sstables for which we read a partition tombstone. + */ + Collections.sort(view.sstables, SSTableReader.maxTimestampDescending); - long mostRecentPartitionTombstone = Long.MIN_VALUE; int nonIntersectingSSTables = 0; - List<SSTableReader> skippedSSTablesWithTombstones = null; + int includedDueToTombstones = 0; + SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector(); + if (isTrackingRepairedStatus()) + Tracing.trace("Collecting data from sstables and tracking repaired status"); + for (SSTableReader sstable : view.sstables) { // if we've already seen a partition tombstone with a timestamp greater // than the most recent update to this sstable, we can skip it + // if we're tracking repaired status, we mark the repaired digest inconclusive + // as other replicas may not have seen this partition delete and so could include + // data from this sstable (or others) in their digests if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone) + { + inputCollector.markInconclusive(); break; + } - if (!shouldInclude(sstable)) - { - nonIntersectingSSTables++; - if (sstable.mayHaveTombstones()) - { // if sstable has tombstones we need to check after one pass if it can be safely skipped - if (skippedSSTablesWithTombstones == null) - skippedSSTablesWithTombstones = new ArrayList<>(); - skippedSSTablesWithTombstones.add(sstable); - - } - continue; - } - - minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp()); - - @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, - // or through the closing of the final merged iterator - UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, metricsCollector); - if (!sstable.isRepaired()) - oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); - - inputCollector.addSSTableIterator(sstable, iter); - mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, - iter.partitionLevelDeletion().markedForDeleteAt()); - } - - int includedDueToTombstones = 0; - // Check for sstables with tombstones that are not expired - if (skippedSSTablesWithTombstones != null) - { - for (SSTableReader sstable : skippedSSTablesWithTombstones) + if (shouldInclude(sstable)) { - if (sstable.getMaxTimestamp() <= minTimestamp) - continue; - - @SuppressWarnings("resource") // 'iter' is added to iterators which is close on exception, - // or through the closing of the final merged iterator - UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, metricsCollector); if (!sstable.isRepaired()) oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); + // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator + @SuppressWarnings("resource") - UnfilteredRowIterator iter = makeIterator(cfs, sstable, true, metricsCollector); - iterators.add(iter); ++ UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, metricsCollector); + inputCollector.addSSTableIterator(sstable, iter); - includedDueToTombstones++; + mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, + iter.partitionLevelDeletion().markedForDeleteAt()); + } + else + { - + nonIntersectingSSTables++; + // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely + if (sstable.mayHaveTombstones()) + { + // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator + @SuppressWarnings("resource") - UnfilteredRowIterator iter = makeIterator(cfs, sstable, true, metricsCollector); ++ UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, metricsCollector); + // if the sstable contains a partition delete, then we must include it regardless of whether it + // shadows any other data seen locally as we can't guarantee that other replicas have seen it + if (!iter.partitionLevelDeletion().isLive()) + { + if (!sstable.isRepaired()) + oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); - iterators.add(iter); ++ inputCollector.addSSTableIterator(sstable, iter); + includedDueToTombstones++; + mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, + iter.partitionLevelDeletion().markedForDeleteAt()); + } + else + { + iter.close(); + } + } } } + if (Tracing.isTracing()) Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones", nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones); diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java index c7a6d71,35794e2..ae35aaf --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@@ -17,15 -17,9 +17,16 @@@ */ package org.apache.cassandra.repair; -import java.net.InetAddress; +import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; ++import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@@ -49,35 -33,22 +50,40 @@@ import org.apache.commons.lang3.time.Du import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor; +import com.codahale.metrics.Timer; +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.config.SchemaConstants; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.metrics.RepairMetrics; ++import org.apache.cassandra.db.SnapshotCommand; ++import org.apache.cassandra.gms.FailureDetector; ++import org.apache.cassandra.net.Message; ++import org.apache.cassandra.net.MessagingService; ++import org.apache.cassandra.net.Verb; ++import org.apache.cassandra.repair.consistent.SyncStatSummary; ++import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.cql3.statements.SelectStatement; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.ConsistencyLevel; - import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; - import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.repair.consistent.CoordinatorSession; - import org.apache.cassandra.repair.consistent.SyncStatSummary; import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageService; ++import org.apache.cassandra.service.reads.repair.RepairedDataVerifier; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.tracing.TraceKeyspace; import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; @@@ -425,262 -306,68 +431,326 @@@ public class RepairRunnable implements hasFailure.compareAndSet(false, true); } } - return ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges); + return Futures.immediateFuture(null); } - }); - Futures.addCallback(anticompactionResult, new FutureCallback<Object>() + }, MoreExecutors.directExecutor()); + Futures.addCallback(repairResult, new RepairCompleteCallback(parentSession, successfulRanges, startTime, traceState, hasFailure, executor), MoreExecutors.directExecutor()); + } + + /** + * removes dead nodes from common ranges, and exludes ranges left without any participants + */ + @VisibleForTesting + static List<CommonRange> filterCommonRanges(List<CommonRange> commonRanges, Set<InetAddressAndPort> liveEndpoints, boolean force) + { + if (!force) { - public void onSuccess(Object result) + return commonRanges; + } + else + { + List<CommonRange> filtered = new ArrayList<>(commonRanges.size()); + + for (CommonRange commonRange : commonRanges) { - SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges); - if (hasFailure.get()) + Set<InetAddressAndPort> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, liveEndpoints::contains)); + Set<InetAddressAndPort> transEndpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.transEndpoints, liveEndpoints::contains)); + Preconditions.checkState(endpoints.containsAll(transEndpoints), "transEndpoints must be a subset of endpoints"); + + // this node is implicitly a participant in this repair, so a single endpoint is ok here + if (!endpoints.isEmpty()) { - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, - "Some repair failed")); + filtered.add(new CommonRange(endpoints, transEndpoints, commonRange.ranges)); } - else + } + Preconditions.checkState(!filtered.isEmpty(), "Not enough live endpoints for a repair"); + return filtered; + } + } + + private void incrementalRepair(UUID parentSession, + long startTime, + boolean forceRepair, + TraceState traceState, + Set<InetAddressAndPort> allNeighbors, + List<CommonRange> commonRanges, + String... cfnames) + { + // the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted + Set<InetAddressAndPort> allParticipants = ImmutableSet.<InetAddressAndPort>builder() + .addAll(allNeighbors) + .add(FBUtilities.getBroadcastAddressAndPort()) + .build(); + + List<CommonRange> allRanges = filterCommonRanges(commonRanges, allParticipants, forceRepair); + + CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants, forceRepair); + ListeningExecutorService executor = createExecutor(); + AtomicBoolean hasFailure = new AtomicBoolean(false); + ListenableFuture repairResult = coordinatorSession.execute(() -> submitRepairSessions(parentSession, true, executor, allRanges, cfnames), + hasFailure); + Collection<Range<Token>> ranges = new HashSet<>(); + for (Collection<Range<Token>> range : Iterables.transform(allRanges, cr -> cr.ranges)) + { + ranges.addAll(range); + } + Futures.addCallback(repairResult, new RepairCompleteCallback(parentSession, ranges, startTime, traceState, hasFailure, executor), MoreExecutors.directExecutor()); + } + + private void previewRepair(UUID parentSession, + long startTime, + List<CommonRange> commonRanges, + String... cfnames) + { + + logger.debug("Starting preview repair for {}", parentSession); + // Set up RepairJob executor for this repair command. + ListeningExecutorService executor = createExecutor(); + + final ListenableFuture<List<RepairSessionResult>> allSessions = submitRepairSessions(parentSession, false, executor, commonRanges, cfnames); + + Futures.addCallback(allSessions, new FutureCallback<List<RepairSessionResult>>() + { + public void onSuccess(List<RepairSessionResult> results) + { + try { - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress, - "Repair completed successfully")); + if (results == null || results.stream().anyMatch(s -> s == null)) + { + // something failed + fail(null); + return; + } + PreviewKind previewKind = options.getPreviewKind(); + Preconditions.checkState(previewKind != PreviewKind.NONE, "Preview is NONE"); + SyncStatSummary summary = new SyncStatSummary(true); + summary.consumeSessionResults(results); + + final String message; + if (summary.isEmpty()) + { + message = previewKind == PreviewKind.REPAIRED ? "Repaired data is in sync" : "Previewed data was in sync"; + } + else + { + message = (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary.toString(); + RepairMetrics.previewFailures.inc(); ++ if (previewKind == PreviewKind.REPAIRED) ++ maybeSnapshotReplicas(parentSession, keyspace, results); + } + notification(message); + + success("Repair preview completed successfully"); + } + catch (Throwable t) + { + logger.error("Error completing preview repair", t); + onFailure(t); + } + finally + { + executor.shutdownNow(); } - repairComplete(); } public void onFailure(Throwable t) { - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage())); - SystemDistributedKeyspace.failParentRepair(parentSession, t); - repairComplete(); + notifyError(t); + fail("Error completing preview repair: " + t.getMessage()); + executor.shutdownNow(); } + }, MoreExecutors.directExecutor()); + } + ++ private void maybeSnapshotReplicas(UUID parentSession, String keyspace, List<RepairSessionResult> results) ++ { ++ if (!DatabaseDescriptor.snapshotOnRepairedDataMismatch()) ++ return; + - private void repairComplete() ++ try ++ { ++ Set<String> mismatchingTables = new HashSet<>(); ++ Set<InetAddressAndPort> nodes = new HashSet<>(); ++ for (RepairSessionResult sessionResult : results) + { - String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime, - true, true); - String message = String.format("Repair command #%d finished in %s", cmd, duration); - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message)); - logger.info(message); - if (options.isTraced() && traceState != null) ++ for (RepairResult repairResult : emptyIfNull(sessionResult.repairJobResults)) + { - for (ProgressListener listener : listeners) - traceState.removeProgressListener(listener); - // Because DebuggableThreadPoolExecutor#afterExecute and this callback - // run in a nondeterministic order (within the same thread), the - // TraceState may have been nulled out at this point. The TraceState - // should be traceState, so just set it without bothering to check if it - // actually was nulled out. - Tracing.instance.set(traceState); - Tracing.traceRepair(message); - Tracing.instance.stopSession(); ++ for (SyncStat stat : emptyIfNull(repairResult.stats)) ++ { ++ if (stat.numberOfDifferences > 0) ++ mismatchingTables.add(repairResult.desc.columnFamily); ++ // snapshot all replicas, even if they don't have any differences ++ nodes.add(stat.nodes.coordinator); ++ nodes.add(stat.nodes.peer); ++ } + } - executor.shutdownNow(); + } - }); ++ ++ String snapshotName = RepairedDataVerifier.SnapshottingVerifier.getSnapshotName(); ++ for (String table : mismatchingTables) ++ { ++ // we can just check snapshot existence locally since the repair coordinator is always a replica (unlike in the read case) ++ if (!Keyspace.open(keyspace).getColumnFamilyStore(table).snapshotExists(snapshotName)) ++ { ++ logger.info("{} Snapshotting {}.{} for preview repair mismatch with tag {} on instances {}", ++ options.getPreviewKind().logPrefix(parentSession), ++ keyspace, table, snapshotName, nodes) ++ ; ++ Message<SnapshotCommand> message = Message.out(Verb.SNAPSHOT_REQ, new SnapshotCommand(keyspace, ++ table, ++ snapshotName, ++ false)); ++ for (InetAddressAndPort target : nodes) ++ MessagingService.instance().send(message, target); ++ } ++ else ++ { ++ logger.info("{} Not snapshotting {}.{} - snapshot {} exists", ++ options.getPreviewKind().logPrefix(parentSession), ++ keyspace, table, snapshotName); ++ } ++ } ++ } ++ catch (Exception e) ++ { ++ logger.error("{} Failed snapshotting replicas", options.getPreviewKind().logPrefix(parentSession), e); ++ } + } + - private void addRangeToNeighbors(List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> neighborRangeList, Range<Token> range, Set<InetAddress> neighbors) ++ private static <T> Iterable<T> emptyIfNull(Iterable<T> iter) + { - for (int i = 0; i < neighborRangeList.size(); i++) ++ if (iter == null) ++ return Collections.emptyList(); ++ return iter; ++ } ++ + private ListenableFuture<List<RepairSessionResult>> submitRepairSessions(UUID parentSession, + boolean isIncremental, + ListeningExecutorService executor, + List<CommonRange> commonRanges, + String... cfnames) + { + List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size()); + + // we do endpoint filtering at the start of an incremental repair, + // so repair sessions shouldn't also be checking liveness + boolean force = options.isForcedRepair() && !isIncremental; + for (CommonRange commonRange : commonRanges) + { + logger.info("Starting RepairSession for {}", commonRange); + RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession, + commonRange, + keyspace, + options.getParallelism(), + isIncremental, + options.isPullRepair(), + force, + options.getPreviewKind(), + options.optimiseStreams(), + executor, + cfnames); + if (session == null) + continue; + Futures.addCallback(session, new RepairSessionCallback(session), MoreExecutors.directExecutor()); + futures.add(session); + } + return Futures.successfulAsList(futures); + } + + private ListeningExecutorService createExecutor() + { + return MoreExecutors.listeningDecorator(new JMXEnabledThreadPoolExecutor(options.getJobThreads(), + Integer.MAX_VALUE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("Repair#" + cmd), + "internal")); + } + + private class RepairSessionCallback implements FutureCallback<RepairSessionResult> + { + private final RepairSession session; + + public RepairSessionCallback(RepairSession session) + { + this.session = session; + } + + public void onSuccess(RepairSessionResult result) + { + String message = String.format("Repair session %s for range %s finished", session.getId(), + session.ranges().toString()); + logger.info(message); + fireProgressEvent(new ProgressEvent(ProgressEventType.PROGRESS, + progressCounter.incrementAndGet(), + totalProgress, + message)); + } + + public void onFailure(Throwable t) { - Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p = neighborRangeList.get(i); + String message = String.format("Repair session %s for range %s failed with error %s", + session.getId(), session.ranges().toString(), t.getMessage()); + notifyError(new RuntimeException(message, t)); + } + } - if (p.left.containsAll(neighbors)) + private class RepairCompleteCallback implements FutureCallback<Object> + { + final UUID parentSession; + final Collection<Range<Token>> successfulRanges; + final long startTime; + final TraceState traceState; + final AtomicBoolean hasFailure; + final ExecutorService executor; + + public RepairCompleteCallback(UUID parentSession, + Collection<Range<Token>> successfulRanges, + long startTime, + TraceState traceState, + AtomicBoolean hasFailure, + ExecutorService executor) + { + this.parentSession = parentSession; + this.successfulRanges = successfulRanges; + this.startTime = startTime; + this.traceState = traceState; + this.hasFailure = hasFailure; + this.executor = executor; + } + + public void onSuccess(Object result) + { + maybeStoreParentRepairSuccess(successfulRanges); + if (hasFailure.get()) { - p.right.add(range); + fail(null); + } + else + { + success("Repair completed successfully"); + } + executor.shutdownNow(); + } + + public void onFailure(Throwable t) + { + notifyError(t); + fail(t.getMessage()); + executor.shutdownNow(); + } + } + + private static void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors) + { + Set<InetAddressAndPort> endpoints = neighbors.endpoints(); + Set<InetAddressAndPort> transEndpoints = neighbors.filter(Replica::isTransient).endpoints(); + + for (CommonRange commonRange : neighborRangeList) + { + if (commonRange.matchesEndpoints(endpoints, transEndpoints)) + { + commonRange.ranges.add(range); return; } } diff --cc src/java/org/apache/cassandra/service/SnapshotVerbHandler.java index cf2872b,a997533..4d8c4df --- a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java +++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java @@@ -17,30 -17,30 +17,97 @@@ */ package org.apache.cassandra.service; ++import java.util.concurrent.Executor; ++import java.util.concurrent.Executors; ++ import org.slf4j.Logger; import org.slf4j.LoggerFactory; ++import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.SnapshotCommand; import org.apache.cassandra.db.Keyspace; ++import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.IVerbHandler; -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand> { + public static final SnapshotVerbHandler instance = new SnapshotVerbHandler(); ++ public static final String REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX = "RepairedDataMismatch-"; ++ private static final Executor REPAIRED_DATA_MISMATCH_SNAPSHOT_EXECUTOR = Executors.newSingleThreadExecutor(); + private static final Logger logger = LoggerFactory.getLogger(SnapshotVerbHandler.class); - public void doVerb(MessageIn<SnapshotCommand> message, int id) + public void doVerb(Message<SnapshotCommand> message) { SnapshotCommand command = message.payload; if (command.clear_snapshot) + { Keyspace.clearSnapshot(command.snapshot_name, command.keyspace); + } ++ else if (command.snapshot_name.startsWith(REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX)) ++ { ++ REPAIRED_DATA_MISMATCH_SNAPSHOT_EXECUTOR.execute(new RepairedDataSnapshotTask(command, message.from())); ++ } else ++ { Keyspace.open(command.keyspace).getColumnFamilyStore(command.column_family).snapshot(command.snapshot_name); - logger.debug("Enqueuing response to snapshot request {} to {}", command.snapshot_name, message.from); - MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); ++ } + + logger.debug("Enqueuing response to snapshot request {} to {}", command.snapshot_name, message.from()); + MessagingService.instance().send(message.emptyResponse(), message.from()); + } ++ ++ private static class RepairedDataSnapshotTask implements Runnable ++ { ++ final SnapshotCommand command; ++ final InetAddressAndPort from; ++ ++ RepairedDataSnapshotTask(SnapshotCommand command, InetAddressAndPort from) ++ { ++ this.command = command; ++ this.from = from; ++ } ++ ++ public void run() ++ { ++ try ++ { ++ Keyspace ks = Keyspace.open(command.keyspace); ++ if (ks == null) ++ { ++ logger.info("Snapshot request received from {} for {}.{} but keyspace not found", ++ from, ++ command.keyspace, ++ command.column_family); ++ return; ++ } ++ ++ ColumnFamilyStore cfs = ks.getColumnFamilyStore(command.column_family); ++ if (cfs.snapshotExists(command.snapshot_name)) ++ { ++ logger.info("Received snapshot request from {} for {}.{} following repaired data mismatch, " + ++ "but snapshot with tag {} already exists", ++ from, ++ command.keyspace, ++ command.column_family, ++ command.snapshot_name); ++ return; ++ } ++ logger.info("Creating snapshot requested by {} of {}.{} following repaired data mismatch", ++ from, ++ command.keyspace, ++ command.column_family); ++ cfs.snapshot(command.snapshot_name); ++ } ++ catch (IllegalArgumentException e) ++ { ++ logger.warn("Snapshot request received from {} for {}.{} but table not found", ++ from, ++ command.keyspace, ++ command.column_family); ++ } ++ } + } } diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index 59e6de7,15fe938..38e9c44 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -2764,107 -2813,11 +2764,125 @@@ public class StorageProxy implements St return Schema.instance.getNumberOfTables(); } + public String getIdealConsistencyLevel() + { + return DatabaseDescriptor.getIdealConsistencyLevel().toString(); + } + + public String setIdealConsistencyLevel(String cl) + { + ConsistencyLevel original = DatabaseDescriptor.getIdealConsistencyLevel(); + ConsistencyLevel newCL = ConsistencyLevel.valueOf(cl.trim().toUpperCase()); + DatabaseDescriptor.setIdealConsistencyLevel(newCL); + return String.format("Updating ideal consistency level new value: %s old value %s", newCL, original.toString()); + } + + @Deprecated public int getOtcBacklogExpirationInterval() { - return DatabaseDescriptor.getOtcBacklogExpirationInterval(); + return 0; + } + + @Deprecated + public void setOtcBacklogExpirationInterval(int intervalInMillis) { } + + @Override + public void enableRepairedDataTrackingForRangeReads() + { + DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(true); + } + + @Override + public void disableRepairedDataTrackingForRangeReads() + { + DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(false); } - public void setOtcBacklogExpirationInterval(int intervalInMillis) { - DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis); + @Override + public boolean getRepairedDataTrackingEnabledForRangeReads() + { + return DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled(); + } + + @Override + public void enableRepairedDataTrackingForPartitionReads() + { + DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(true); + } + + @Override + public void disableRepairedDataTrackingForPartitionReads() + { + DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(false); + } + + @Override + public boolean getRepairedDataTrackingEnabledForPartitionReads() + { + return DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled(); + } + + @Override + public void enableReportingUnconfirmedRepairedDataMismatches() + { + DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true); + } + + @Override + public void disableReportingUnconfirmedRepairedDataMismatches() + { + DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(false); + } + + @Override + public boolean getReportingUnconfirmedRepairedDataMismatchesEnabled() + { + return DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(); + } + ++ @Override ++ public boolean getSnapshotOnRepairedDataMismatchEnabled() ++ { ++ return DatabaseDescriptor.snapshotOnRepairedDataMismatch(); ++ } ++ ++ @Override ++ public void enableSnapshotOnRepairedDataMismatch() ++ { ++ DatabaseDescriptor.setSnapshotOnRepairedDataMismatch(true); ++ } ++ ++ @Override ++ public void disableSnapshotOnRepairedDataMismatch() ++ { ++ DatabaseDescriptor.setSnapshotOnRepairedDataMismatch(false); ++ } ++ + static class PaxosBallotAndContention + { + final UUID ballot; + final int contentions; + + PaxosBallotAndContention(UUID ballot, int contentions) + { + this.ballot = ballot; + this.contentions = contentions; + } + + @Override + public final int hashCode() + { + int hashCode = 31 + (ballot == null ? 0 : ballot.hashCode()); + return 31 * hashCode * this.contentions; + } + + @Override + public final boolean equals(Object o) + { + if(!(o instanceof PaxosBallotAndContention)) + return false; + PaxosBallotAndContention that = (PaxosBallotAndContention)o; + // handles nulls properly + return Objects.equals(ballot, that.ballot) && contentions == that.contentions; + } } } diff --cc src/java/org/apache/cassandra/service/StorageProxyMBean.java index 514feb1,8678dde..e0d2c86 --- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java +++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java @@@ -66,26 -63,7 +66,30 @@@ public interface StorageProxyMBea public void setOtcBacklogExpirationInterval(int intervalInMillis); /** Returns each live node's schema version */ - public Map<String, List<String>> getSchemaVersions(); + @Deprecated public Map<String, List<String>> getSchemaVersions(); + public Map<String, List<String>> getSchemaVersionsWithPort(); public int getNumberOfTables(); + + public String getIdealConsistencyLevel(); + public String setIdealConsistencyLevel(String cl); + + /** + * Tracking and reporting of variances in the repaired data set across replicas at read time + */ + void enableRepairedDataTrackingForRangeReads(); + void disableRepairedDataTrackingForRangeReads(); + boolean getRepairedDataTrackingEnabledForRangeReads(); + + void enableRepairedDataTrackingForPartitionReads(); + void disableRepairedDataTrackingForPartitionReads(); + boolean getRepairedDataTrackingEnabledForPartitionReads(); + + void enableReportingUnconfirmedRepairedDataMismatches(); + void disableReportingUnconfirmedRepairedDataMismatches(); + boolean getReportingUnconfirmedRepairedDataMismatchesEnabled(); ++ ++ void enableSnapshotOnRepairedDataMismatch(); ++ void disableSnapshotOnRepairedDataMismatch(); ++ boolean getSnapshotOnRepairedDataMismatchEnabled(); } diff --cc src/java/org/apache/cassandra/service/reads/DataResolver.java index 45bf918,0000000..251ee28 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/reads/DataResolver.java +++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java @@@ -1,280 -1,0 +1,280 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service.reads; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import com.google.common.base.Joiner; +import com.google.common.collect.Collections2; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.ReadResponse; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.db.rows.RangeTombstoneMarker; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; +import org.apache.cassandra.db.transform.EmptyPartitionsDiscarder; +import org.apache.cassandra.db.transform.Filter; +import org.apache.cassandra.db.transform.FilteredPartitions; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.locator.Endpoints; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.reads.repair.ReadRepair; +import org.apache.cassandra.service.reads.repair.RepairedDataTracker; +import org.apache.cassandra.service.reads.repair.RepairedDataVerifier; + +import static com.google.common.collect.Iterables.*; + +public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> extends ResponseResolver<E, P> +{ + private final boolean enforceStrictLiveness; + private final ReadRepair<E, P> readRepair; + + public DataResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, ReadRepair<E, P> readRepair, long queryStartNanoTime) + { + super(command, replicaPlan, queryStartNanoTime); + this.enforceStrictLiveness = command.metadata().enforceStrictLiveness(); + this.readRepair = readRepair; + } + + public PartitionIterator getData() + { + ReadResponse response = responses.get(0).payload; + return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec()); + } + + public boolean isDataPresent() + { + return !responses.isEmpty(); + } + + @SuppressWarnings("resource") + public PartitionIterator resolve() + { + // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here + // at the beginning of this method), so grab the response count once and use that through the method. + Collection<Message<ReadResponse>> messages = responses.snapshot(); + assert !any(messages, msg -> msg.payload.isDigestResponse()); + + E replicas = replicaPlan().candidates().select(transform(messages, msg -> msg.from()), false); + List<UnfilteredPartitionIterator> iters = new ArrayList<>( + Collections2.transform(messages, msg -> msg.payload.makeIterator(command))); + assert replicas.size() == iters.size(); + + // If requested, inspect each response for a digest of the replica's repaired data set + RepairedDataTracker repairedDataTracker = command.isTrackingRepairedStatus() + ? new RepairedDataTracker(getRepairedDataVerifier(command)) + : null; + if (repairedDataTracker != null) + { + messages.forEach(msg -> { + if (msg.payload.mayIncludeRepairedDigest() && replicas.byEndpoint().get(msg.from()).isFull()) + { + repairedDataTracker.recordDigest(msg.from(), + msg.payload.repairedDataDigest(), + msg.payload.isRepairedDigestConclusive()); + } + }); + } + + /* + * Even though every response, individually, will honor the limit, it is possible that we will, after the merge, + * have more rows than the client requested. To make sure that we still conform to the original limit, + * we apply a top-level post-reconciliation counter to the merged partition iterator. + * + * Short read protection logic (ShortReadRowsProtection.moreContents()) relies on this counter to be applied + * to the current partition to work. For this reason we have to apply the counter transformation before + * empty partition discard logic kicks in - for it will eagerly consume the iterator. + * + * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions + * + * See CASSANDRA-13747 for more details. + */ + DataLimits.Counter mergedResultCounter = + command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness); + + UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, + replicaPlan.getWithContacts(replicas), + mergedResultCounter, + repairedDataTracker); + FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness())); + PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter); + return Transformation.apply(counted, new EmptyPartitionsDiscarder()); + } + + protected RepairedDataVerifier getRepairedDataVerifier(ReadCommand command) + { - return RepairedDataVerifier.simple(command); ++ return RepairedDataVerifier.verifier(command); + } + + private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, + P sources, + DataLimits.Counter mergedResultCounter, + RepairedDataTracker repairedDataTracker) + { + // If we have only one results, there is no read repair to do, we can't get short + // reads and we can't make a comparison between repaired data sets + if (results.size() == 1) + return results.get(0); + + /* + * So-called short reads stems from nodes returning only a subset of the results they have due to the limit, + * but that subset not being enough post-reconciliation. So if we don't have a limit, don't bother. + */ + if (!command.limits().isUnlimited()) + for (int i = 0; i < results.size(); i++) + results.set(i, ShortReadProtection.extend(sources.contacts().get(i), results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness)); + + return UnfilteredPartitionIterators.merge(results, wrapMergeListener(readRepair.getMergeListener(sources), sources, repairedDataTracker)); + } + + private String makeResponsesDebugString(DecoratedKey partitionKey) + { + return Joiner.on(",\n").join(transform(getMessages().snapshot(), m -> m.from() + " => " + m.payload.toDebugString(command, partitionKey))); + } + + private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, + P sources, + RepairedDataTracker repairedDataTracker) + { + // Avoid wrapping no-op listener as it doesn't throw, unless we're tracking repaired status + // in which case we need to inject the tracker & verify on close + if (partitionListener == UnfilteredPartitionIterators.MergeListener.NOOP) + { + if (repairedDataTracker == null) + return partitionListener; + + return new UnfilteredPartitionIterators.MergeListener() + { + + public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) + { + return UnfilteredRowIterators.MergeListener.NOOP; + } + + public void close() + { + repairedDataTracker.verify(); + } + }; + } + + return new UnfilteredPartitionIterators.MergeListener() + { + public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) + { + UnfilteredRowIterators.MergeListener rowListener = partitionListener.getRowMergeListener(partitionKey, versions); + + return new UnfilteredRowIterators.MergeListener() + { + public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) + { + try + { + rowListener.onMergedPartitionLevelDeletion(mergedDeletion, versions); + } + catch (AssertionError e) + { + // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd + // rather get more info to debug than not. + TableMetadata table = command.metadata(); + String details = String.format("Error merging partition level deletion on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s", + table, + mergedDeletion == null ? "null" : mergedDeletion.toString(), + '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString())) + ']', + sources.contacts(), + makeResponsesDebugString(partitionKey)); + throw new AssertionError(details, e); + } + } + + public void onMergedRows(Row merged, Row[] versions) + { + try + { + rowListener.onMergedRows(merged, versions); + } + catch (AssertionError e) + { + // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd + // rather get more info to debug than not. + TableMetadata table = command.metadata(); + String details = String.format("Error merging rows on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s", + table, + merged == null ? "null" : merged.toString(table), + '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']', + sources.contacts(), + makeResponsesDebugString(partitionKey)); + throw new AssertionError(details, e); + } + } + + public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) + { + try + { + // The code for merging range tombstones is a tad complex and we had the assertions there triggered + // unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights + // when that happen without more context that what the assertion errors give us however, hence the + // catch here that basically gather as much as context as reasonable. + rowListener.onMergedRangeTombstoneMarkers(merged, versions); + } + catch (AssertionError e) + { + + // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd + // rather get more info to debug than not. + TableMetadata table = command.metadata(); + String details = String.format("Error merging RTs on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s", + table, + merged == null ? "null" : merged.toString(table), + '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']', + sources.contacts(), + makeResponsesDebugString(partitionKey)); + throw new AssertionError(details, e); + } + + } + + public void close() + { + rowListener.close(); + } + }; + } + + public void close() + { + partitionListener.close(); + if (repairedDataTracker != null) + repairedDataTracker.verify(); + } + }; + } +} diff --cc src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java index 816fe9f,0000000..bed240c mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java +++ b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java @@@ -1,95 -1,0 +1,157 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.repair; + ++import java.time.LocalDate; ++import java.time.format.DateTimeFormatter; ++import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; ++import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; - import org.apache.cassandra.db.PartitionRangeReadCommand; +import org.apache.cassandra.db.ReadCommand; - import org.apache.cassandra.db.SinglePartitionReadCommand; ++import org.apache.cassandra.db.SnapshotCommand; ++import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.metrics.TableMetrics; ++import org.apache.cassandra.net.Message; ++import org.apache.cassandra.net.MessagingService; ++import org.apache.cassandra.net.Verb; ++import org.apache.cassandra.schema.TableId; ++import org.apache.cassandra.service.SnapshotVerbHandler; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.NoSpamLogger; + +public interface RepairedDataVerifier +{ + public void verify(RepairedDataTracker tracker); + ++ static RepairedDataVerifier verifier(ReadCommand command) ++ { ++ return DatabaseDescriptor.snapshotOnRepairedDataMismatch() ? snapshotting(command) : simple(command); ++ } ++ + static RepairedDataVerifier simple(ReadCommand command) + { + return new SimpleVerifier(command); + } + ++ static RepairedDataVerifier snapshotting(ReadCommand command) ++ { ++ return new SnapshottingVerifier(command); ++ } ++ + static class SimpleVerifier implements RepairedDataVerifier + { + private static final Logger logger = LoggerFactory.getLogger(SimpleVerifier.class); - private final ReadCommand command; ++ protected final ReadCommand command; + + private static final String INCONSISTENCY_WARNING = "Detected mismatch between repaired datasets for table {}.{} during read of {}. {}"; + + SimpleVerifier(ReadCommand command) + { + this.command = command; + } + + @Override + public void verify(RepairedDataTracker tracker) + { + Tracing.trace("Verifying repaired data tracker {}", tracker); + + // some mismatch occurred between the repaired datasets on the replicas + if (tracker.digests.keySet().size() > 1) + { + // if any of the digests should be considered inconclusive, because there were + // pending repair sessions which had not yet been committed or unrepaired partition + // deletes which meant some sstables were skipped during reads, mark the inconsistency + // as confirmed + if (tracker.inconclusiveDigests.isEmpty()) + { + TableMetrics metrics = ColumnFamilyStore.metricsFor(command.metadata().id); + metrics.confirmedRepairedInconsistencies.mark(); + NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, + INCONSISTENCY_WARNING, command.metadata().keyspace, - command.metadata().name, getCommandString(), tracker); ++ command.metadata().name, command.toString(), tracker); + } + else if (DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches()) + { + TableMetrics metrics = ColumnFamilyStore.metricsFor(command.metadata().id); + metrics.unconfirmedRepairedInconsistencies.mark(); + NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, + INCONSISTENCY_WARNING, command.metadata().keyspace, - command.metadata().name, getCommandString(), tracker); ++ command.metadata().name, command.toString(), tracker); + } + } + } ++ } ++ ++ static class SnapshottingVerifier extends SimpleVerifier ++ { ++ private static final Logger logger = LoggerFactory.getLogger(SnapshottingVerifier.class); ++ private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.BASIC_ISO_DATE; ++ private static final String SNAPSHOTTING_WARNING = "Issuing snapshot command for mismatch between repaired datasets for table {}.{} during read of {}. {}"; ++ ++ // Issue at most 1 snapshot request per minute for any given table. ++ // Replicas will only create one snapshot per day, but this stops us ++ // from swamping the network if we start seeing mismatches. ++ private static final long SNAPSHOT_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(1); ++ private static final ConcurrentHashMap<TableId, AtomicLong> LAST_SNAPSHOT_TIMES = new ConcurrentHashMap<>(); ++ ++ SnapshottingVerifier(ReadCommand command) ++ { ++ super(command); ++ } + - private String getCommandString() ++ public void verify(RepairedDataTracker tracker) + { - return command instanceof SinglePartitionReadCommand - ? ((SinglePartitionReadCommand)command).partitionKey().toString() - : ((PartitionRangeReadCommand)command).dataRange().keyRange().getString(command.metadata().partitionKeyType); ++ super.verify(tracker); ++ if (tracker.digests.keySet().size() > 1) ++ { ++ if (tracker.inconclusiveDigests.isEmpty() || DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches()) ++ { ++ long now = System.nanoTime(); ++ AtomicLong cached = LAST_SNAPSHOT_TIMES.computeIfAbsent(command.metadata().id, u -> new AtomicLong(0)); ++ long last = cached.get(); ++ if (now - last > SNAPSHOT_INTERVAL_NANOS && cached.compareAndSet(last, now)) ++ { ++ logger.warn(SNAPSHOTTING_WARNING, command.metadata().keyspace, command.metadata().name, command.toString(), tracker); ++ Message<SnapshotCommand> msg = Message.out(Verb.SNAPSHOT_REQ, ++ new SnapshotCommand(command.metadata().keyspace, ++ command.metadata().name, ++ getSnapshotName(), ++ false)); ++ for (InetAddressAndPort replica : tracker.digests.values()) ++ MessagingService.instance().send(msg, replica); ++ } ++ } ++ } ++ } + ++ public static String getSnapshotName() ++ { ++ return String.format("%s%s", ++ SnapshotVerbHandler.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX, ++ DATE_FORMAT.format(LocalDate.now())); + } + } +} ++ diff --cc test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java index 30c8f25,0000000..70b40bc mode 100644,000000..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java @@@ -1,281 -1,0 +1,383 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; +import java.util.ArrayList; ++import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; ++import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.collect.ImmutableList; ++import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Test; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICoordinator; ++import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.IIsolatedExecutor; +import org.apache.cassandra.distributed.api.IMessage; +import org.apache.cassandra.distributed.api.IMessageFilters; +import org.apache.cassandra.distributed.shared.RepairResult; +import org.apache.cassandra.net.Verb; ++import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.repair.RepairParallelism; +import org.apache.cassandra.repair.messages.RepairOption; ++import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.SimpleCondition; +import org.apache.cassandra.utils.progress.ProgressEventType; + +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class PreviewRepairTest extends TestBaseImpl +{ + /** + * makes sure that the repaired sstables are not matching on the two + * nodes by disabling autocompaction on node2 and then running an + * incremental repair + */ + @Test + public void testWithMismatchingPending() throws Throwable + { + try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start())) + { + cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)"); + insert(cluster.coordinator(1), 0, 100); + cluster.forEach((node) -> node.flush(KEYSPACE)); + cluster.get(1).callOnInstance(repair(options(false))); + insert(cluster.coordinator(1), 100, 100); + cluster.forEach((node) -> node.flush(KEYSPACE)); + + // make sure that all sstables have moved to repaired by triggering a compaction + // also disables autocompaction on the nodes + cluster.forEach((node) -> node.runOnInstance(() -> { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl"); + FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs)); + cfs.disableAutoCompaction(); + })); + cluster.get(1).callOnInstance(repair(options(false))); + // now re-enable autocompaction on node1, this moves the sstables for the new repair to repaired + cluster.get(1).runOnInstance(() -> { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl"); + cfs.enableAutoCompaction(); + FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs)); + }); + RepairResult rs = cluster.get(1).callOnInstance(repair(options(true))); + assertTrue(rs.success); // preview repair should succeed + assertFalse(rs.wasInconsistent); // and we should see no mismatches + } + } + + /** + * another case where the repaired datasets could mismatch is if an incremental repair finishes just as the preview + * repair is starting up. + * + * This tests this case: + * 1. we start a preview repair + * 2. pause the validation requests from node1 -> node2 + * 3. node1 starts its validation + * 4. run an incremental repair which completes fine + * 5. node2 resumes its validation + * + * Now we will include sstables from the second incremental repair on node2 but not on node1 + * This should fail since we fail any preview repair which is ongoing when an incremental repair finishes (step 4 above) + */ + @Test + public void testFinishingIncRepairDuringPreview() throws IOException, InterruptedException, ExecutionException + { + ExecutorService es = Executors.newSingleThreadExecutor(); + try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start())) + { + cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)"); + + insert(cluster.coordinator(1), 0, 100); + cluster.forEach((node) -> node.flush(KEYSPACE)); + cluster.get(1).callOnInstance(repair(options(false))); + + insert(cluster.coordinator(1), 100, 100); + cluster.forEach((node) -> node.flush(KEYSPACE)); + + SimpleCondition continuePreviewRepair = new SimpleCondition(); + DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair); + // this pauses the validation request sent from node1 to node2 until we have run a full inc repair below + cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop(); + + Future<RepairResult> rsFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true)))); + Thread.sleep(1000); + // this needs to finish before the preview repair is unpaused on node2 + cluster.get(1).callOnInstance(repair(options(false))); + continuePreviewRepair.signalAll(); + RepairResult rs = rsFuture.get(); + assertFalse(rs.success); // preview repair should have failed + assertFalse(rs.wasInconsistent); // and no mismatches should have been reported + } + finally + { + es.shutdown(); + } + } + + /** + * Same as testFinishingIncRepairDuringPreview but the previewed range does not intersect the incremental repair + * so both preview and incremental repair should finish fine (without any mismatches) + */ + @Test + public void testFinishingNonIntersectingIncRepairDuringPreview() throws IOException, InterruptedException, ExecutionException + { + ExecutorService es = Executors.newSingleThreadExecutor(); + try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start())) + { + cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)"); + + insert(cluster.coordinator(1), 0, 100); + cluster.forEach((node) -> node.flush(KEYSPACE)); + assertTrue(cluster.get(1).callOnInstance(repair(options(false))).success); + + insert(cluster.coordinator(1), 100, 100); + cluster.forEach((node) -> node.flush(KEYSPACE)); + + // pause preview repair validation messages on node2 until node1 has finished + SimpleCondition continuePreviewRepair = new SimpleCondition(); + DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair); + cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop(); + + // get local ranges to repair two separate ranges: + List<String> localRanges = cluster.get(1).callOnInstance(() -> { + List<String> res = new ArrayList<>(); + for (Range<Token> r : StorageService.instance.getLocalReplicas(KEYSPACE).ranges()) + res.add(r.left.getTokenValue()+ ":"+ r.right.getTokenValue()); + return res; + }); + + assertEquals(2, localRanges.size()); + Future<RepairResult> repairStatusFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true, localRanges.get(0))))); + Thread.sleep(1000); // wait for node1 to start validation compaction + // this needs to finish before the preview repair is unpaused on node2 + assertTrue(cluster.get(1).callOnInstance(repair(options(false, localRanges.get(1)))).success); + + continuePreviewRepair.signalAll(); + RepairResult rs = repairStatusFuture.get(); + assertTrue(rs.success); // repair should succeed + assertFalse(rs.wasInconsistent); // and no mismatches + } + finally + { + es.shutdown(); + } + } + - private static class DelayMessageFilter implements IMessageFilters.Matcher ++ @Test ++ public void snapshotTest() throws IOException, InterruptedException ++ { ++ try(Cluster cluster = init(Cluster.build(3).withConfig(config -> ++ config.set("snapshot_on_repaired_data_mismatch", true) ++ .with(GOSSIP) ++ .with(NETWORK)) ++ .start())) ++ { ++ cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)"); ++ cluster.schemaChange("create table " + KEYSPACE + ".tbl2 (id int primary key, t int)"); ++ Thread.sleep(1000); ++ ++ // populate 2 tables ++ insert(cluster.coordinator(1), 0, 100, "tbl"); ++ insert(cluster.coordinator(1), 0, 100, "tbl2"); ++ cluster.forEach((n) -> n.flush(KEYSPACE)); ++ ++ // make sure everything is marked repaired ++ cluster.get(1).callOnInstance(repair(options(false))); ++ waitMarkedRepaired(cluster); ++ // make node2 mismatch ++ unmarkRepaired(cluster.get(2), "tbl"); ++ verifySnapshots(cluster, "tbl", true); ++ verifySnapshots(cluster, "tbl2", true); ++ ++ AtomicInteger snapshotMessageCounter = new AtomicInteger(); ++ cluster.filters().verbs(Verb.SNAPSHOT_REQ.id).messagesMatching((from, to, message) -> { ++ snapshotMessageCounter.incrementAndGet(); ++ return false; ++ }).drop(); ++ cluster.get(1).callOnInstance(repair(options(true))); ++ verifySnapshots(cluster, "tbl", false); ++ // tbl2 should not have a mismatch, so the snapshots should be empty here ++ verifySnapshots(cluster, "tbl2", true); ++ assertEquals(3, snapshotMessageCounter.get()); ++ ++ // and make sure that we don't try to snapshot again ++ snapshotMessageCounter.set(0); ++ cluster.get(3).callOnInstance(repair(options(true))); ++ assertEquals(0, snapshotMessageCounter.get()); ++ } ++ } ++ ++ private void waitMarkedRepaired(Cluster cluster) ++ { ++ cluster.forEach(node -> node.runOnInstance(() -> { ++ for (String table : Arrays.asList("tbl", "tbl2")) ++ { ++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(table); ++ while (true) ++ { ++ if (cfs.getLiveSSTables().stream().allMatch(SSTableReader::isRepaired)) ++ return; ++ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); ++ } ++ } ++ })); ++ } ++ ++ private void unmarkRepaired(IInvokableInstance instance, String table) ++ { ++ instance.runOnInstance(() -> { ++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(table); ++ try ++ { ++ cfs.getCompactionStrategyManager().mutateRepaired(cfs.getLiveSSTables(), ActiveRepairService.UNREPAIRED_SSTABLE, null, false); ++ } ++ catch (IOException e) ++ { ++ throw new RuntimeException(e); ++ } ++ }); ++ } ++ ++ private void verifySnapshots(Cluster cluster, String table, boolean shouldBeEmpty) ++ { ++ cluster.forEach(node -> node.runOnInstance(() -> { ++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(table); ++ if(shouldBeEmpty) ++ { ++ assertTrue(cfs.getSnapshotDetails().isEmpty()); ++ } ++ else ++ { ++ while (cfs.getSnapshotDetails().isEmpty()) ++ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); ++ } ++ })); ++ } ++ ++ static class DelayMessageFilter implements IMessageFilters.Matcher + { + private final SimpleCondition condition; + private final AtomicBoolean waitForRepair = new AtomicBoolean(true); + + public DelayMessageFilter(SimpleCondition condition) + { + this.condition = condition; + } + public boolean matches(int from, int to, IMessage message) + { + try + { + // only the first validation req should be delayed: + if (waitForRepair.compareAndSet(true, false)) + condition.await(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + return false; // don't drop the message + } + } + + private static void insert(ICoordinator coordinator, int start, int count) + { ++ insert(coordinator, start, count, "tbl"); ++ } ++ ++ static void insert(ICoordinator coordinator, int start, int count, String table) ++ { + for (int i = start; i < start + count; i++) - coordinator.execute("insert into " + KEYSPACE + ".tbl (id, t) values (?, ?)", ConsistencyLevel.ALL, i, i); ++ coordinator.execute("insert into " + KEYSPACE + "." + table + " (id, t) values (?, ?)", ConsistencyLevel.ALL, i, i); + } + + /** + * returns a pair with [repair success, was inconsistent] + */ + private static IIsolatedExecutor.SerializableCallable<RepairResult> repair(Map<String, String> options) + { + return () -> { + SimpleCondition await = new SimpleCondition(); + AtomicBoolean success = new AtomicBoolean(true); + AtomicBoolean wasInconsistent = new AtomicBoolean(false); + StorageService.instance.repair(KEYSPACE, options, ImmutableList.of((tag, event) -> { + if (event.getType() == ProgressEventType.ERROR) + { + success.set(false); + await.signalAll(); + } + else if (event.getType() == ProgressEventType.NOTIFICATION && event.getMessage().contains("Repaired data is inconsistent")) + { + wasInconsistent.set(true); + } + else if (event.getType() == ProgressEventType.COMPLETE) + await.signalAll(); + })); + try + { + await.await(1, TimeUnit.MINUTES); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + return new RepairResult(success.get(), wasInconsistent.get()); + }; + } + + private static Map<String, String> options(boolean preview) + { + Map<String, String> config = new HashMap<>(); + config.put(RepairOption.INCREMENTAL_KEY, "true"); + config.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.toString()); + if (preview) + config.put(RepairOption.PREVIEW, PreviewKind.REPAIRED.toString()); + return config; + } + + private static Map<String, String> options(boolean preview, String range) + { + Map<String, String> options = options(preview); + options.put(RepairOption.RANGES_KEY, range); + return options; + } +} diff --cc test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java index 4e44543,0000000..664c99d mode 100644,000000..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java @@@ -1,205 -1,0 +1,316 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; - import java.io.Serializable; ++import java.time.LocalDate; ++import java.time.format.DateTimeFormatter; +import java.util.EnumSet; ++import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.TimeUnit; + ++import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Assert; +import org.junit.Test; + ++import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; ++import org.apache.cassandra.distributed.api.IInvokableInstance; ++import org.apache.cassandra.distributed.api.IIsolatedExecutor; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.MetadataComponent; +import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.service.ActiveRepairService; ++import org.apache.cassandra.service.SnapshotVerbHandler; +import org.apache.cassandra.service.StorageProxy; + - public class RepairDigestTrackingTest extends TestBaseImpl implements Serializable// TODO: why serializable? ++public class RepairDigestTrackingTest extends TestBaseImpl +{ ++ private static final String TABLE = "tbl"; ++ private static final String KS_TABLE = KEYSPACE + "." + TABLE; + + @Test + public void testInconsistenciesFound() throws Throwable + { + try (Cluster cluster = (Cluster) init(builder().withNodes(2).start())) + { + + cluster.get(1).runOnInstance(() -> { + StorageProxy.instance.enableRepairedDataTrackingForRangeReads(); + }); + - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (k INT, c INT, v INT, PRIMARY KEY (k,c)) with read_repair='NONE'"); ++ cluster.schemaChange("CREATE TABLE " + KS_TABLE+ " (k INT, c INT, v INT, PRIMARY KEY (k,c)) with read_repair='NONE'"); + for (int i = 0; i < 10; i++) + { - cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (k, c, v) VALUES (?, ?, ?)", ++ cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (?, ?, ?)", + ConsistencyLevel.ALL, + i, i, i); + } - - cluster.get(1).runOnInstance(() -> - Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush() - ); - cluster.get(2).runOnInstance(() -> - Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush() - ); ++ cluster.forEach(i -> i.flush(KEYSPACE)); + + for (int i = 10; i < 20; i++) + { - cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (k, c, v) VALUES (?, ?, ?)", ++ cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (?, ?, ?)", + ConsistencyLevel.ALL, + i, i, i); + } ++ cluster.forEach(i -> i.flush(KEYSPACE)); ++ cluster.forEach(i -> assertNotRepaired()); ++ ++ // mark everything on node 2 repaired ++ cluster.get(2).runOnInstance(markAllRepaired()); ++ cluster.get(2).runOnInstance(assertRepaired()); + - cluster.get(1).runOnInstance(() -> - Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush() - ); - cluster.get(2).runOnInstance(() -> - Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush() - ); - - cluster.get(1).runOnInstance(() -> - Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertNotRepaired) - ); - cluster.get(2).runOnInstance(() -> - Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertNotRepaired) - ); - - cluster.get(2).runOnInstance(() -> - Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::markRepaired) - ); - - - cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (k, c, v) VALUES (?, ?, ?)", 5, 5, 55); - cluster.get(1).runOnInstance(() -> - Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertNotRepaired) - ); - cluster.get(2).runOnInstance(() -> - Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertRepaired) - ); - - long ccBefore = cluster.get(1).callOnInstance(() -> - Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.confirmedRepairedInconsistencies.table.getCount() - ); - - cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL); - long ccAfter = cluster.get(1).callOnInstance(() -> - Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.confirmedRepairedInconsistencies.table.getCount() - ); ++ // insert more data on node1 to generate an initial mismatch ++ cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (?, ?, ?)", 5, 5, 55); ++ cluster.get(1).runOnInstance(assertNotRepaired()); + ++ long ccBefore = getConfirmedInconsistencies(cluster.get(1)); ++ cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE, ConsistencyLevel.ALL); ++ long ccAfter = getConfirmedInconsistencies(cluster.get(1)); + Assert.assertEquals("confirmed count should differ by 1 after range read", ccBefore + 1, ccAfter); + } + } + + @Test + public void testPurgeableTombstonesAreIgnored() throws Throwable + { + try (Cluster cluster = (Cluster) init(builder().withNodes(2).start())) + { - + cluster.get(1).runOnInstance(() -> { + StorageProxy.instance.enableRepairedDataTrackingForRangeReads(); + }); + - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl2 (k INT, c INT, v1 INT, v2 INT, PRIMARY KEY (k,c)) WITH gc_grace_seconds=0"); ++ cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v1 INT, v2 INT, PRIMARY KEY (k,c)) WITH gc_grace_seconds=0"); + // on node1 only insert some tombstones, then flush + for (int i = 0; i < 10; i++) + { - cluster.get(1).executeInternal("DELETE v1 FROM " + KEYSPACE + ".tbl2 USING TIMESTAMP 0 WHERE k=? and c=? ", i, i); ++ cluster.get(1).executeInternal("DELETE v1 FROM " + KS_TABLE + " USING TIMESTAMP 0 WHERE k=? and c=? ", i, i); + } - cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush()); ++ cluster.get(1).flush(KEYSPACE); + + // insert data on both nodes and flush + for (int i = 0; i < 10; i++) + { - cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl2 (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 1", ++ cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 1", + ConsistencyLevel.ALL, + i, i, i); + } - cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush()); - cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush()); ++ cluster.forEach(i -> i.flush(KEYSPACE)); + + // nothing is repaired yet - cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertNotRepaired)); - cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertNotRepaired)); ++ cluster.forEach(i -> assertNotRepaired()); + // mark everything repaired - cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::markRepaired)); - cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::markRepaired)); - cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertRepaired)); - cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertRepaired)); ++ cluster.forEach(i -> markAllRepaired()); ++ cluster.forEach(i -> assertRepaired()); + + // now overwrite on node2 only to generate digest mismatches, but don't flush so the repaired dataset is not affected + for (int i = 0; i < 10; i++) + { - cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl2 (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 2", i, i, i * 2); ++ cluster.get(2).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 2", i, i, i * 2); + } + - long ccBefore = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").metric.confirmedRepairedInconsistencies.table.getCount()); ++ long ccBefore = getConfirmedInconsistencies(cluster.get(1)); + // Unfortunately we need to sleep here to ensure that nowInSec > the local deletion time of the tombstones + TimeUnit.SECONDS.sleep(2); - cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl2", ConsistencyLevel.ALL); - long ccAfter = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").metric.confirmedRepairedInconsistencies.table.getCount()); ++ cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE, ConsistencyLevel.ALL); ++ long ccAfter = getConfirmedInconsistencies(cluster.get(1)); + + Assert.assertEquals("No repaired data inconsistencies should be detected", ccBefore, ccAfter); + } + } + - private void assertNotRepaired(SSTableReader reader) { - Assert.assertTrue("repaired at is set for sstable: " + reader.descriptor, getRepairedAt(reader) == ActiveRepairService.UNREPAIRED_SSTABLE); - } ++ @Test ++ public void testSnapshottingOnInconsistency() throws Throwable ++ { ++ try (Cluster cluster = init(Cluster.create(2))) ++ { ++ cluster.get(1).runOnInstance(() -> { ++ StorageProxy.instance.enableRepairedDataTrackingForPartitionReads(); ++ }); ++ ++ cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v INT, PRIMARY KEY (k,c))"); ++ for (int i = 0; i < 10; i++) ++ { ++ cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)", ++ ConsistencyLevel.ALL, i, i); ++ } ++ cluster.forEach(c -> c.flush(KEYSPACE)); + - private void assertRepaired(SSTableReader reader) { - Assert.assertTrue("repaired at is not set for sstable: " + reader.descriptor, getRepairedAt(reader) > 0); ++ for (int i = 10; i < 20; i++) ++ { ++ cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)", ++ ConsistencyLevel.ALL, i, i); ++ } ++ cluster.forEach(c -> c.flush(KEYSPACE)); ++ cluster.forEach(i -> i.runOnInstance(assertNotRepaired())); ++ // Mark everything repaired on node2 ++ cluster.get(2).runOnInstance(markAllRepaired()); ++ cluster.get(2).runOnInstance(assertRepaired()); ++ ++ // now overwrite on node1 only to generate digest mismatches ++ cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)", 5, 55); ++ cluster.get(1).runOnInstance(assertNotRepaired()); ++ ++ // Execute a partition read and assert inconsistency is detected (as nothing is repaired on node1) ++ long ccBefore = getConfirmedInconsistencies(cluster.get(1)); ++ cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=0", ConsistencyLevel.ALL); ++ long ccAfter = getConfirmedInconsistencies(cluster.get(1)); ++ Assert.assertEquals("confirmed count should increment by 1 after each partition read", ccBefore + 1, ccAfter); ++ ++ String snapshotName = SnapshotVerbHandler.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX ++ + DateTimeFormatter.BASIC_ISO_DATE.format(LocalDate.now()); ++ ++ cluster.forEach(i -> i.runOnInstance(assertSnapshotNotPresent(snapshotName))); ++ ++ // re-introduce a mismatch, enable snapshotting and try again ++ cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)", 5, 555); ++ cluster.get(1).runOnInstance(() -> { ++ StorageProxy.instance.enableSnapshotOnRepairedDataMismatch(); ++ }); ++ ++ cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=0", ConsistencyLevel.ALL); ++ ccAfter = getConfirmedInconsistencies(cluster.get(1)); ++ Assert.assertEquals("confirmed count should increment by 1 after each partition read", ccBefore + 2, ccAfter); ++ ++ cluster.forEach(i -> i.runOnInstance(assertSnapshotPresent(snapshotName))); ++ } + } + - private long getRepairedAt(SSTableReader reader) ++ private IIsolatedExecutor.SerializableRunnable assertNotRepaired() + { - Descriptor descriptor = reader.descriptor; - try ++ return () -> + { - Map<MetadataType, MetadataComponent> metadata = descriptor.getMetadataSerializer() - .deserialize(descriptor, EnumSet.of(MetadataType.STATS)); ++ try ++ { ++ Iterator<SSTableReader> sstables = Keyspace.open(KEYSPACE) ++ .getColumnFamilyStore(TABLE) ++ .getLiveSSTables() ++ .iterator(); ++ while (sstables.hasNext()) ++ { ++ SSTableReader sstable = sstables.next(); ++ Descriptor descriptor = sstable.descriptor; ++ Map<MetadataType, MetadataComponent> metadata = descriptor.getMetadataSerializer() ++ .deserialize(descriptor, EnumSet.of(MetadataType.STATS)); ++ ++ StatsMetadata stats = (StatsMetadata) metadata.get(MetadataType.STATS); ++ Assert.assertEquals("repaired at is set for sstable: " + descriptor, ++ stats.repairedAt, ++ ActiveRepairService.UNREPAIRED_SSTABLE); ++ } ++ } catch (IOException e) { ++ throw new RuntimeException(e); ++ } ++ }; ++ } + - StatsMetadata stats = (StatsMetadata) metadata.get(MetadataType.STATS); - return stats.repairedAt; - } catch (IOException e) { - throw new RuntimeException(e); - } ++ private IIsolatedExecutor.SerializableRunnable markAllRepaired() ++ { ++ return () -> ++ { ++ try ++ { ++ Iterator<SSTableReader> sstables = Keyspace.open(KEYSPACE) ++ .getColumnFamilyStore(TABLE) ++ .getLiveSSTables() ++ .iterator(); ++ while (sstables.hasNext()) ++ { ++ SSTableReader sstable = sstables.next(); ++ Descriptor descriptor = sstable.descriptor; ++ descriptor.getMetadataSerializer() ++ .mutateRepairMetadata(descriptor, System.currentTimeMillis(), null, false); ++ sstable.reloadSSTableMetadata(); ++ } ++ } catch (IOException e) { ++ throw new RuntimeException(e); ++ } ++ }; ++ } + ++ private IIsolatedExecutor.SerializableRunnable assertRepaired() ++ { ++ return () -> ++ { ++ try ++ { ++ Iterator<SSTableReader> sstables = Keyspace.open(KEYSPACE) ++ .getColumnFamilyStore(TABLE) ++ .getLiveSSTables() ++ .iterator(); ++ while (sstables.hasNext()) ++ { ++ SSTableReader sstable = sstables.next(); ++ Descriptor descriptor = sstable.descriptor; ++ Map<MetadataType, MetadataComponent> metadata = descriptor.getMetadataSerializer() ++ .deserialize(descriptor, EnumSet.of(MetadataType.STATS)); ++ ++ StatsMetadata stats = (StatsMetadata) metadata.get(MetadataType.STATS); ++ Assert.assertTrue("repaired at is not set for sstable: " + descriptor, stats.repairedAt > 0); ++ } ++ } ++ catch (IOException e) ++ { ++ throw new RuntimeException(e); ++ } ++ }; + } + - private void markRepaired(SSTableReader reader) { - Descriptor descriptor = reader.descriptor; - try ++ private IInvokableInstance.SerializableRunnable assertSnapshotPresent(String snapshotName) ++ { ++ return () -> + { - descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, System.currentTimeMillis(), null, false); - reader.reloadSSTableMetadata(); - } catch (IOException e) { - throw new RuntimeException(e); - } ++ // snapshots are taken asynchronously, this is crude but it gives it a chance to happen ++ int attempts = 100; ++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE); ++ ++ while (cfs.getSnapshotDetails().isEmpty()) ++ { ++ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); ++ if (attempts-- < 0) ++ throw new AssertionError(String.format("Snapshot %s not found for for %s", snapshotName, KS_TABLE)); ++ } ++ }; ++ } + ++ private IInvokableInstance.SerializableRunnable assertSnapshotNotPresent(String snapshotName) ++ { ++ return () -> ++ { ++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE); ++ Assert.assertFalse(cfs.snapshotExists(snapshotName)); ++ }; + } + ++ private long getConfirmedInconsistencies(IInvokableInstance instance) ++ { ++ return instance.callOnInstance(() -> Keyspace.open(KEYSPACE) ++ .getColumnFamilyStore(TABLE) ++ .metric ++ .confirmedRepairedInconsistencies ++ .table ++ .getCount()); ++ } +} diff --cc test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java index 226331c,75e5ba9..a547c76 --- a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java @@@ -1,23 -1,7 +1,25 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.distributed.test; + import java.util.Set; + import org.junit.Assert; import org.junit.Test; @@@ -27,17 -10,14 +29,18 @@@ import org.apache.cassandra.distributed import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.ICluster; import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.metrics.ReadRepairMetrics; + import org.apache.cassandra.io.sstable.format.SSTableReader; -import static org.junit.Assert.assertEquals; - +import static org.apache.cassandra.distributed.api.Feature.NETWORK; import static org.apache.cassandra.distributed.shared.AssertUtils.*; +import static org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD; +import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ; +import static org.apache.cassandra.net.Verb.READ_REPAIR_RSP; +import static org.junit.Assert.fail; // TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository -public class SimpleReadWriteTest extends SharedClusterTestBase +public class SimpleReadWriteTest extends TestBaseImpl { @Test public void coordinatorReadTest() throws Throwable @@@ -392,24 -258,118 +395,122 @@@ @Test public void metricsCountQueriesTest() throws Throwable { - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); - for (int i = 0; i < 100; i++) - cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?,?,?)", ConsistencyLevel.ALL, i, i, i); - - long readCount1 = readCount((IInvokableInstance) cluster.get(1)); - long readCount2 = readCount((IInvokableInstance) cluster.get(2)); - for (int i = 0; i < 100; i++) - cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ? and ck = ?", ConsistencyLevel.ALL, i, i); - - readCount1 = readCount((IInvokableInstance) cluster.get(1)) - readCount1; - readCount2 = readCount((IInvokableInstance) cluster.get(2)) - readCount2; - assertEquals(readCount1, readCount2); - assertEquals(100, readCount1); + try (ICluster<IInvokableInstance> cluster = init(Cluster.create(2))) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + for (int i = 0; i < 100; i++) + cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (pk, ck, v) VALUES (?,?,?)", ConsistencyLevel.ALL, i, i, i); + + long readCount1 = readCount(cluster.get(1)); + long readCount2 = readCount(cluster.get(2)); + for (int i = 0; i < 100; i++) + cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl WHERE pk = ? and ck = ?", ConsistencyLevel.ALL, i, i); + + readCount1 = readCount(cluster.get(1)) - readCount1; + readCount2 = readCount(cluster.get(2)) - readCount2; + Assert.assertEquals(readCount1, readCount2); + Assert.assertEquals(100, readCount1); + } } + + @Test + public void skippedSSTableWithPartitionDeletionTest() throws Throwable + { + try (Cluster cluster = init(Cluster.create(2))) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))"); + // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp + cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0"); + // and a row from a different partition, to provide the sstable's min/max clustering + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 2"); + cluster.get(1).flush(KEYSPACE); + // expect a single sstable, where minTimestamp equals the timestamp of the partition delete + cluster.get(1).runOnInstance(() -> { + Set<SSTableReader> sstables = Keyspace.open(KEYSPACE) + .getColumnFamilyStore("tbl") + .getLiveSSTables(); - assertEquals(1, sstables.size()); - assertEquals(1, sstables.iterator().next().getMinTimestamp()); ++ assertEquals("Expected a single sstable, but found " + sstables.size(), 1, sstables.size()); ++ long minTimestamp = sstables.iterator().next().getMinTimestamp(); ++ assertEquals("Expected min timestamp of 1, but was " + minTimestamp, 1, minTimestamp); + }); + + // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0"); + + + Object[][] rows = cluster.coordinator(1) + .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5", + ConsistencyLevel.ALL); - assertEquals(0, rows.length); ++ assertEquals("Expected 0 rows, but found " + rows.length, 0, rows.length); + } + } + + @Test + public void skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode() throws Throwable + { + try (Cluster cluster = init(Cluster.create(2))) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))"); + // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp + cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0"); + // and a row from a different partition, to provide the sstable's min/max clustering + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 1"); + cluster.get(1).flush(KEYSPACE); + // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped due to its min/max clusterings. Now we + // insert a row which is not shadowed by the partition delete and flush to a second sstable. Importantly, + // this sstable's minTimestamp is > than the maxTimestamp of the first sstable. This would cause the first + // sstable not to be reincluded in the merge input, but we can't really make that decision as we don't + // know what data and/or tombstones are present on other nodes + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2"); + cluster.get(1).flush(KEYSPACE); + + // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0"); + + Object[][] rows = cluster.coordinator(1) + .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5", + ConsistencyLevel.ALL); + // we expect that the row from node 2 (0, 10, 10) was shadowed by the partition delete, but the row from + // node 1 (0, 6, 6) was not. + assertRows(rows, new Object[] {0, 6 ,6}); + } + } + + @Test + public void skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode2() throws Throwable + { + // don't not add skipped sstables back just because the partition delete ts is < the local min ts + + try (Cluster cluster = init(Cluster.create(2))) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))"); + // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp + cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0"); + // and a row from a different partition, to provide the sstable's min/max clustering + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 3"); + cluster.get(1).flush(KEYSPACE); + // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped due to its min/max clusterings. Now we + // insert a row which is not shadowed by the partition delete and flush to a second sstable. The first sstable + // has a maxTimestamp > than the min timestamp of all sstables, so it is a candidate for reinclusion to the + // merge. Hoever, the second sstable's minTimestamp is > than the partition delete. This would cause the + // first sstable not to be reincluded in the merge input, but we can't really make that decision as we don't + // know what data and/or tombstones are present on other nodes + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2"); + cluster.get(1).flush(KEYSPACE); + + // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0"); + + Object[][] rows = cluster.coordinator(1) + .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5", + ConsistencyLevel.ALL); + // we expect that the row from node 2 (0, 10, 10) was shadowed by the partition delete, but the row from + // node 1 (0, 6, 6) was not. + assertRows(rows, new Object[] {0, 6 ,6}); + } + } + private long readCount(IInvokableInstance instance) { return instance.callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readLatency.latency.getCount()); diff --cc test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java index c916d13,0000000..169e09d mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java @@@ -1,291 -1,0 +1,293 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.repair; + +import java.net.UnknownHostException; +import java.util.Random; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.SinglePartitionReadCommand; ++import org.apache.cassandra.db.Slices; ++import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +import static org.junit.Assert.assertEquals; + +public class RepairedDataVerifierTest +{ + private static final String TEST_NAME = "read_command_vh_test_"; + private static final String KEYSPACE = TEST_NAME + "cql_keyspace"; + private static final String TABLE = "table1"; + + private final Random random = new Random(); + private TableMetadata metadata; + private TableMetrics metrics; + + // counter to generate the last byte of peer addresses + private int addressSuffix = 10; + + @BeforeClass + public static void init() + { + SchemaLoader.loadSchema(); + SchemaLoader.schemaDefinition(TEST_NAME); + DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true); + } + + @Before + public void setup() + { + metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE); + metrics = ColumnFamilyStore.metricsFor(metadata.id); + } + + @Test + public void repairedDataMismatchWithSomeConclusive() + { + long confirmedCount = confirmedCount(); + long unconfirmedCount = unconfirmedCount(); + InetAddressAndPort peer1 = peer(); + InetAddressAndPort peer2 = peer(); + RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key())); + RepairedDataTracker tracker = new RepairedDataTracker(verifier); + tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false); + tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), true); + + tracker.verify(); + assertEquals(confirmedCount, confirmedCount()); + assertEquals(unconfirmedCount + 1 , unconfirmedCount()); + } + + @Test + public void repairedDataMismatchWithNoneConclusive() + { + long confirmedCount = confirmedCount(); + long unconfirmedCount = unconfirmedCount(); + InetAddressAndPort peer1 = peer(); + InetAddressAndPort peer2 = peer(); + RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key())); + RepairedDataTracker tracker = new RepairedDataTracker(verifier); + tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false); + tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), false); + + tracker.verify(); + assertEquals(confirmedCount, confirmedCount()); + assertEquals(unconfirmedCount + 1 , unconfirmedCount()); + } + + @Test + public void repairedDataMismatchWithAllConclusive() + { + long confirmedCount = confirmedCount(); + long unconfirmedCount = unconfirmedCount(); + InetAddressAndPort peer1 = peer(); + InetAddressAndPort peer2 = peer(); + RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key())); + RepairedDataTracker tracker = new RepairedDataTracker(verifier); + tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true); + tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), true); + + tracker.verify(); + assertEquals(confirmedCount + 1, confirmedCount()); + assertEquals(unconfirmedCount, unconfirmedCount()); + } + + @Test + public void repairedDataMatchesWithAllConclusive() + { + long confirmedCount = confirmedCount(); + long unconfirmedCount = unconfirmedCount(); + InetAddressAndPort peer1 = peer(); + InetAddressAndPort peer2 = peer(); + RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key())); + RepairedDataTracker tracker = new RepairedDataTracker(verifier); + tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true); + tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), true); + + tracker.verify(); + assertEquals(confirmedCount, confirmedCount()); + assertEquals(unconfirmedCount, unconfirmedCount()); + } + + @Test + public void repairedDataMatchesWithSomeConclusive() + { + long confirmedCount = confirmedCount(); + long unconfirmedCount = unconfirmedCount(); + InetAddressAndPort peer1 = peer(); + InetAddressAndPort peer2 = peer(); + RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key())); + RepairedDataTracker tracker = new RepairedDataTracker(verifier); + tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true); + tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), false); + + tracker.verify(); + assertEquals(confirmedCount, confirmedCount()); + assertEquals(unconfirmedCount, unconfirmedCount()); + } + + @Test + public void repairedDataMatchesWithNoneConclusive() + { + long confirmedCount = confirmedCount(); + long unconfirmedCount = unconfirmedCount(); + InetAddressAndPort peer1 = peer(); + InetAddressAndPort peer2 = peer(); + RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key())); + RepairedDataTracker tracker = new RepairedDataTracker(verifier); + tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false); + tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), false); + + tracker.verify(); + assertEquals(confirmedCount, confirmedCount()); + assertEquals(unconfirmedCount, unconfirmedCount()); + } + + @Test + public void allEmptyDigestWithAllConclusive() + { + // if a read didn't touch any repaired sstables, digests will be empty + long confirmedCount = confirmedCount(); + long unconfirmedCount = unconfirmedCount(); + InetAddressAndPort peer1 = peer(); + InetAddressAndPort peer2 = peer(); + RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key())); + RepairedDataTracker tracker = new RepairedDataTracker(verifier); + tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, true); + tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, true); + + tracker.verify(); + assertEquals(confirmedCount, confirmedCount()); + assertEquals(unconfirmedCount, unconfirmedCount()); + } + + @Test + public void allEmptyDigestsWithSomeConclusive() + { + // if a read didn't touch any repaired sstables, digests will be empty + long confirmedCount = confirmedCount(); + long unconfirmedCount = unconfirmedCount(); + InetAddressAndPort peer1 = peer(); + InetAddressAndPort peer2 = peer(); + RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key())); + RepairedDataTracker tracker = new RepairedDataTracker(verifier); + tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, true); + tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, false); + + tracker.verify(); + assertEquals(confirmedCount, confirmedCount()); + assertEquals(unconfirmedCount, unconfirmedCount()); + } + + @Test + public void allEmptyDigestsWithNoneConclusive() + { + // if a read didn't touch any repaired sstables, digests will be empty + long confirmedCount = confirmedCount(); + long unconfirmedCount = unconfirmedCount(); + InetAddressAndPort peer1 = peer(); + InetAddressAndPort peer2 = peer(); + RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key())); + RepairedDataTracker tracker = new RepairedDataTracker(verifier); + tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, false); + tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, false); + + tracker.verify(); + assertEquals(confirmedCount, confirmedCount()); + assertEquals(unconfirmedCount, unconfirmedCount()); + } + + @Test + public void noTrackingDataRecorded() + { + // if a read didn't land on any replicas which support repaired data tracking, nothing will be recorded + long confirmedCount = confirmedCount(); + long unconfirmedCount = unconfirmedCount(); + RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key())); + RepairedDataTracker tracker = new RepairedDataTracker(verifier); + tracker.verify(); + assertEquals(confirmedCount, confirmedCount()); + assertEquals(unconfirmedCount, unconfirmedCount()); + } + + private long confirmedCount() + { + return metrics.confirmedRepairedInconsistencies.table.getCount(); + } + + private long unconfirmedCount() + { + return metrics.unconfirmedRepairedInconsistencies.table.getCount(); + } + + private InetAddressAndPort peer() + { + try + { + return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) addressSuffix++ }); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + private int key() + { + return random.nextInt(); + } + + private ReadCommand command(int key) + { + return new StubReadCommand(key, metadata, false); + } + + private static class StubReadCommand extends SinglePartitionReadCommand + { + StubReadCommand(int key, TableMetadata metadata, boolean isDigest) + { + super(isDigest, + 0, + false, + metadata, + FBUtilities.nowInSeconds(), + ColumnFilter.all(metadata), + RowFilter.NONE, + DataLimits.NONE, + metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)), - null, ++ new ClusteringIndexSliceFilter(Slices.ALL, false), + null); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org