This is an automated email from the ASF dual-hosted git repository. jlewandowski pushed a commit to branch cassandra-4.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 14936d0bd3716ed251e799a264f5ab16d51b893b Merge: 780f8b94ff 4b9c18235a Author: Jacek Lewandowski <lewandowski.ja...@gmail.com> AuthorDate: Fri Mar 31 17:30:20 2023 +0200 Merge branch 'cassandra-4.0' into cassandra-4.1 * cassandra-4.0: Save host id to system.local and flush immediately after startup CHANGES.txt | 5 +- .../org/apache/cassandra/db/SystemKeyspace.java | 23 ++- .../apache/cassandra/db/commitlog/CommitLog.java | 47 ++++-- .../cassandra/db/compaction/CompactionManager.java | 76 +++++++-- .../apache/cassandra/service/StorageService.java | 173 ++++++++++++++++----- .../cassandra/tools/SSTableMetadataViewer.java | 31 ++-- .../cassandra/distributed/impl/Instance.java | 12 +- .../cassandra/distributed/impl/InstanceConfig.java | 4 +- .../distributed/test/IPMembershipTest.java | 4 + .../distributed/test/SSTableIdGenerationTest.java | 24 +-- .../unit/org/apache/cassandra/db/KeyCacheTest.java | 2 - .../cassandra/db/compaction/NeverPurgeTest.java | 8 +- 12 files changed, 302 insertions(+), 107 deletions(-) diff --cc CHANGES.txt index dec7f680f2,675d423080..c12983568e --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,21 -1,4 +1,24 @@@ -4.0.9 +4.1.2 + * Return snapshots with dots in their name in nodetool listsnapshots (CASSANDRA-18371) + * Fix NPE when loading snapshots and data directory is one directory from root (CASSANDRA-18359) + * Do not submit hints when hinted_handoff_enabled=false (CASSANDRA-18304) + * Fix COPY ... TO STDOUT behavior in cqlsh (CASSANDRA-18353) + * Remove six and Py2SaferScanner merge cruft (CASSANDRA-18354) - ++Merged from 4.0: ++Merged from 3.11: ++Merged from 3.0: ++ * Save host id to system.local and flush immediately after startup (CASSANDRA-18153) + +4.1.1 + * Deprecate org.apache.cassandra.hadoop code (CASSANDRA-16984) + * Fix too early schema version change in sysem local table (CASSANDRA-18291) + * Fix copying of JAR of a trigger to temporary file (CASSANDRA-18264) + * Fix possible NoSuchFileException when removing a snapshot (CASSANDRA-18211) + * PaxosPrepare may add instances to the Electorate that are not in gossip (CASSANDRA-18194) + * Fix PAXOS2_COMMIT_AND_PREPARE_RSP serialisation AssertionError (CASSANDRA-18164) + * Streaming progress virtual table lock contention can trigger TCP_USER_TIMEOUT and fail streaming (CASSANDRA-18110) + * Fix perpetual load of denylist on read in cases where denylist can never be loaded (CASSANDRA-18116) +Merged from 4.0: * Fix BufferPool incorrect memoryInUse when putUnusedPortion is used (CASSANDRA-18311) * Improve memtable allocator accounting when updating AtomicBTreePartition (CASSANDRA-18125) * Update zstd-jni to version 1.5.4-1 (CASSANDRA-18259) diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java index 1523720d53,d63ee77736..fd2145b30c --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@@ -559,7 -498,18 +561,13 @@@ public final class SystemKeyspac DECOMMISSIONED } - public static void finishStartup() - { - Schema.instance.saveSystemKeyspace(); - } - public static void persistLocalMetadata() + { + persistLocalMetadata(UUID::randomUUID); + } + + @VisibleForTesting + public static void persistLocalMetadata(Supplier<UUID> nodeIdSupplier) { String req = "INSERT INTO system.%s (" + "key," + diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java index a832b5ea42,49eb67b1df..6195b1b4ca --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@@ -17,19 -17,23 +17,25 @@@ */ package org.apache.cassandra.db.commitlog; - -import java.io.File; -import java.io.FilenameFilter; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.file.FileStore; - import java.util.*; + import java.util.ArrayList; + import java.util.Arrays; + import java.util.Collection; + import java.util.Collections; + import java.util.Iterator; + import java.util.List; + import java.util.Map; + import java.util.TreeMap; + import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.BiPredicate; import java.util.function.Function; import java.util.zip.CRC32; import com.google.common.annotations.VisibleForTesting; - import org.apache.cassandra.io.util.File; +import com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -43,7 -47,7 +49,8 @@@ import org.apache.cassandra.io.compress import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputBufferFixed; -import org.apache.cassandra.io.util.FileUtils; ++import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.PathUtils; import org.apache.cassandra.metrics.CommitLogMetrics; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.CompressionParams; @@@ -52,10 -56,9 +59,9 @@@ import org.apache.cassandra.security.En import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.MBeanWrapper; +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; --import static org.apache.cassandra.db.commitlog.CommitLogSegment.CommitLogSegmentFileComparator; import static org.apache.cassandra.db.commitlog.CommitLogSegment.ENTRY_OVERHEAD_SIZE; import static org.apache.cassandra.utils.FBUtilities.updateChecksum; import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; @@@ -70,6 -73,8 +76,8 @@@ public class CommitLog implements Commi public static final CommitLog instance = CommitLog.construct(); - private static final FilenameFilter unmanagedFilesFilter = (dir, name) -> CommitLogDescriptor.isValid(name) && CommitLogSegment.shouldReplay(name); ++ private static final BiPredicate<File, String> unmanagedFilesFilter = (dir, name) -> CommitLogDescriptor.isValid(name) && CommitLogSegment.shouldReplay(name); + final public AbstractCommitLogSegmentManager segmentManager; public final CommitLogArchiver archiver; @@@ -82,7 -87,7 +90,6 @@@ private static CommitLog construct() { CommitLog log = new CommitLog(CommitLogArchiver.construct(), DatabaseDescriptor.getCommitLogSegmentMgrProvider()); -- MBeanWrapper.instance.registerMBean(log, "org.apache.cassandra.db:type=Commitlog"); return log; } @@@ -145,6 -150,24 +152,24 @@@ return this; } + public boolean isStarted() + { + return started; + } + + public boolean hasFilesToReplay() + { + return getUnmanagedFiles().length > 0; + } + + private File[] getUnmanagedFiles() + { - File[] files = new File(segmentManager.storageDirectory).listFiles(unmanagedFilesFilter); ++ File[] files = new File(segmentManager.storageDirectory).tryList(unmanagedFilesFilter); + if (files == null) + return new File[0]; + return files; + } + /** * Perform recovery on commit logs located in the directory specified by the config file. * @@@ -156,12 -179,10 +181,10 @@@ // submit all files for this segment manager for archiving prior to recovery - CASSANDRA-6904 // The files may have already been archived by normal CommitLog operation. This may cause errors in this // archiving pass, which we should not treat as serious. - for (File file : new File(segmentManager.storageDirectory).tryList(unmanagedFilesFilter)) + for (File file : getUnmanagedFiles()) { - archiver.maybeArchive(file.getPath(), file.getName()); - archiver.maybeWaitForArchiving(file.getName()); + archiver.maybeArchive(file.path(), file.name()); + archiver.maybeWaitForArchiving(file.name()); } assert archiver.archivePending.isEmpty() : "Not all commit log archive tasks were completed before restore"; @@@ -176,7 -197,7 +199,7 @@@ } else { -- Arrays.sort(files, new CommitLogSegmentFileComparator()); ++ Arrays.sort(files, new CommitLogSegment.CommitLogSegmentFileComparator()); logger.info("Replaying {}", StringUtils.join(files, ", ")); replayed = recoverFiles(files); logger.info("Log replay complete, {} replayed mutations", replayed); diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index bb2d89cda7,b53f3319fa..347f9b3ed0 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -17,8 -17,20 +17,17 @@@ */ package org.apache.cassandra.db.compaction; -import java.io.File; import java.io.IOException; - import java.util.*; + import java.util.ArrayList; + import java.util.Arrays; + import java.util.Collection; + import java.util.Collections; + import java.util.HashMap; + import java.util.HashSet; + import java.util.Iterator; + import java.util.List; + import java.util.Map; + import java.util.Set; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@@ -34,25 -48,36 +43,31 @@@ import javax.management.openmbean.Tabul import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; - import com.google.common.collect.*; + import com.google.common.collect.ArrayListMultimap; + import com.google.common.collect.Collections2; + import com.google.common.collect.ConcurrentHashMultiset; + import com.google.common.collect.Iterables; + import com.google.common.collect.Lists; + import com.google.common.collect.Maps; + import com.google.common.collect.Multimap; + import com.google.common.collect.Multiset; + import com.google.common.collect.Sets; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListenableFutureTask; import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.Uninterruptibles; - - import org.apache.cassandra.concurrent.ExecutorFactory; - import org.apache.cassandra.concurrent.WrappedExecutorPlus; - import org.apache.cassandra.dht.AbstractBounds; - import org.apache.cassandra.io.util.File; - import org.apache.cassandra.locator.RangesAtEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.cache.AutoSavingCache; - import org.apache.cassandra.exceptions.ConfigurationException; - import org.apache.cassandra.repair.NoSuchRepairSessionException; - import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; -import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; -import org.apache.cassandra.concurrent.NamedThreadFactory; ++import org.apache.cassandra.concurrent.ExecutorFactory; ++import org.apache.cassandra.concurrent.WrappedExecutorPlus; import org.apache.cassandra.config.DatabaseDescriptor; - import org.apache.cassandra.schema.Schema; - import org.apache.cassandra.db.*; + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.DecoratedKey; + import org.apache.cassandra.db.Directories; + import org.apache.cassandra.db.DiskBoundaries; + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.db.SerializationHeader; + import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.compaction.CompactionInfo.Holder; import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; @@@ -65,8 -91,8 +81,9 @@@ import org.apache.cassandra.dht.Abstrac import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; + import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.index.SecondaryIndexBuilder; +import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.IndexSummaryRedistribution; @@@ -75,16 -102,22 +92,26 @@@ import org.apache.cassandra.io.sstable. import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; ++import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileUtils; + import org.apache.cassandra.locator.RangesAtEndpoint; import org.apache.cassandra.metrics.CompactionMetrics; import org.apache.cassandra.metrics.TableMetrics; ++import org.apache.cassandra.repair.NoSuchRepairSessionException; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; + import org.apache.cassandra.schema.Schema; + import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.PreviewKind; - import org.apache.cassandra.utils.*; + import org.apache.cassandra.utils.FBUtilities; + import org.apache.cassandra.utils.JVMStabilityInspector; + import org.apache.cassandra.utils.MBeanWrapper; + import org.apache.cassandra.utils.Throwables; -import org.apache.cassandra.utils.UUIDGen; ++import org.apache.cassandra.utils.TimeUUID; + import org.apache.cassandra.utils.WrappedRunnable; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.ImmediateFuture; import org.apache.cassandra.utils.concurrent.Refs; import static java.util.Collections.singleton; @@@ -233,6 -259,26 +260,27 @@@ public class CompactionManager implemen return false; } + @VisibleForTesting + public boolean hasOngoingOrPendingTasks() + { + if (!active.getCompactions().isEmpty() || !compactingCF.isEmpty()) + return true; + + int pendingTasks = executor.getPendingTaskCount() + + validationExecutor.getPendingTaskCount() + + viewBuildExecutor.getPendingTaskCount() + + cacheCleanupExecutor.getPendingTaskCount(); + if (pendingTasks > 0) + return true; + + int activeTasks = executor.getActiveTaskCount() + + validationExecutor.getActiveTaskCount() + + viewBuildExecutor.getActiveTaskCount() + + cacheCleanupExecutor.getActiveTaskCount(); + + return activeTasks > 0; + } ++ /** * Shutdowns both compaction and validation executors, cancels running compaction / validation, * and waits for tasks to complete if tasks were not cancelable. diff --cc src/java/org/apache/cassandra/service/StorageService.java index 6d0d24eb94,e7c6fabc75..92c31c1f7a --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -23,12 -22,29 +23,29 @@@ import java.io.DataInputStream import java.io.IOError; import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.nio.file.Paths; +import java.time.Instant; - import java.util.*; + import java.util.ArrayList; + import java.util.Arrays; + import java.util.Collection; + import java.util.Collections; + import java.util.EnumMap; + import java.util.HashMap; + import java.util.HashSet; + import java.util.Iterator; + import java.util.LinkedHashMap; + import java.util.List; + import java.util.Map; import java.util.Map.Entry; + import java.util.Optional; + import java.util.Scanner; + import java.util.Set; + import java.util.SortedMap; + import java.util.TreeMap; + import java.util.TreeSet; + import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; @@@ -56,44 -71,46 +72,48 @@@ import com.google.common.base.Joiner import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.base.Predicates; - import com.google.common.collect.*; - import com.google.common.util.concurrent.*; - - import org.apache.cassandra.config.CassandraRelevantProperties; - import org.apache.cassandra.concurrent.*; - import org.apache.cassandra.config.DataStorageSpec; - import org.apache.cassandra.cql3.QueryHandler; - import org.apache.cassandra.dht.RangeStreamer.FetchReplica; - import org.apache.cassandra.fql.FullQueryLogger; - import org.apache.cassandra.fql.FullQueryLoggerOptions; - import org.apache.cassandra.fql.FullQueryLoggerOptionsCompositeData; - import org.apache.cassandra.io.util.File; - import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict; - import org.apache.cassandra.schema.Keyspaces; - import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster; - import org.apache.cassandra.utils.concurrent.Future; - import org.apache.cassandra.schema.TableId; - import org.apache.cassandra.utils.concurrent.FutureCombiner; - import org.apache.cassandra.utils.concurrent.ImmediateFuture; + import com.google.common.collect.HashMultimap; + import com.google.common.collect.ImmutableList; ++import com.google.common.collect.ImmutableMap; + import com.google.common.collect.Iterables; + import com.google.common.collect.Lists; + import com.google.common.collect.Maps; + import com.google.common.collect.Multimap; + import com.google.common.collect.Ordering; + import com.google.common.collect.Sets; + import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; + import com.google.common.util.concurrent.RateLimiter; + import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.audit.AuditLogManager; import org.apache.cassandra.audit.AuditLogOptions; +import org.apache.cassandra.auth.AuthCacheService; - import org.apache.cassandra.config.Config.PaxosStatePurging; - import org.apache.cassandra.service.paxos.*; - import org.apache.cassandra.service.paxos.cleanup.*; - import org.apache.cassandra.utils.progress.ProgressListener; import org.apache.cassandra.auth.AuthKeyspace; import org.apache.cassandra.auth.AuthSchemaChangeListener; import org.apache.cassandra.batchlog.BatchlogManager; + import org.apache.cassandra.concurrent.ExecutorLocals; ++import org.apache.cassandra.concurrent.FutureTask; ++import org.apache.cassandra.concurrent.FutureTaskWithResources; + import org.apache.cassandra.concurrent.NamedThreadFactory; + import org.apache.cassandra.concurrent.ScheduledExecutors; + import org.apache.cassandra.concurrent.Stage; ++import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.Config; ++import org.apache.cassandra.config.Config.PaxosStatePurging; ++import org.apache.cassandra.config.DataStorageSpec; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.config.DurationSpec; + import org.apache.cassandra.cql3.QueryHandler; import org.apache.cassandra.cql3.QueryProcessor; - import org.apache.cassandra.db.*; + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.Directories; + import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.Mutation; + import org.apache.cassandra.db.SizeEstimatesRecorder; + import org.apache.cassandra.db.SnapshotDetailsTabularData; + import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.Verifier; @@@ -108,33 -142,58 +145,74 @@@ import org.apache.cassandra.hints.Hints import org.apache.cassandra.io.sstable.SSTableLoader; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.VersionAndType; ++import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.PathUtils; - import org.apache.cassandra.locator.*; + import org.apache.cassandra.locator.AbstractReplicationStrategy; + import org.apache.cassandra.locator.DynamicEndpointSnitch; + import org.apache.cassandra.locator.EndpointsByRange; + import org.apache.cassandra.locator.EndpointsByReplica; + import org.apache.cassandra.locator.EndpointsForRange; + import org.apache.cassandra.locator.EndpointsForToken; + import org.apache.cassandra.locator.IEndpointSnitch; + import org.apache.cassandra.locator.InetAddressAndPort; + import org.apache.cassandra.locator.LocalStrategy; + import org.apache.cassandra.locator.NetworkTopologyStrategy; + import org.apache.cassandra.locator.RangesAtEndpoint; + import org.apache.cassandra.locator.RangesByEndpoint; + import org.apache.cassandra.locator.Replica; + import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict; + import org.apache.cassandra.locator.Replicas; + import org.apache.cassandra.locator.SystemReplicas; + import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.metrics.StorageMetrics; - import org.apache.cassandra.net.*; - import org.apache.cassandra.repair.*; + import org.apache.cassandra.net.AsyncOneResponse; + import org.apache.cassandra.net.Message; + import org.apache.cassandra.net.MessagingService; + import org.apache.cassandra.repair.RepairRunnable; -import org.apache.cassandra.repair.SystemDistributedKeyspace; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; import org.apache.cassandra.schema.KeyspaceMetadata; -import org.apache.cassandra.schema.MigrationCoordinator; -import org.apache.cassandra.schema.MigrationManager; ++import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.ReplicationParams; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.SchemaTransformations; +import org.apache.cassandra.schema.SystemDistributedKeyspace; ++import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.schema.ViewMetadata; ++import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster; ++import org.apache.cassandra.service.paxos.Paxos; ++import org.apache.cassandra.service.paxos.PaxosCommit; ++import org.apache.cassandra.service.paxos.PaxosRepair; ++import org.apache.cassandra.service.paxos.PaxosState; ++import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupLocalCoordinator; ++import org.apache.cassandra.service.paxos.cleanup.PaxosTableRepairs; +import org.apache.cassandra.service.snapshot.SnapshotManager; +import org.apache.cassandra.service.snapshot.TableSnapshot; - import org.apache.cassandra.net.AsyncOneResponse; - import org.apache.cassandra.net.MessagingService; - import org.apache.cassandra.streaming.*; + import org.apache.cassandra.streaming.StreamManager; + import org.apache.cassandra.streaming.StreamOperation; + import org.apache.cassandra.streaming.StreamPlan; + import org.apache.cassandra.streaming.StreamResultFuture; + import org.apache.cassandra.streaming.StreamState; import org.apache.cassandra.tracing.TraceKeyspace; import org.apache.cassandra.transport.ClientResourceLimits; import org.apache.cassandra.transport.ProtocolVersion; - import org.apache.cassandra.utils.*; ++import org.apache.cassandra.utils.ExecutorUtils; + import org.apache.cassandra.utils.FBUtilities; + import org.apache.cassandra.utils.JVMStabilityInspector; + import org.apache.cassandra.utils.MBeanWrapper; + import org.apache.cassandra.utils.MD5Digest; + import org.apache.cassandra.utils.OutputHandler; + import org.apache.cassandra.utils.Pair; + import org.apache.cassandra.utils.Throwables; -import org.apache.cassandra.utils.WindowsTimer; + import org.apache.cassandra.utils.WrappedRunnable; ++import org.apache.cassandra.utils.concurrent.Future; ++import org.apache.cassandra.utils.concurrent.FutureCombiner; ++import org.apache.cassandra.utils.concurrent.ImmediateFuture; +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; import org.apache.cassandra.utils.logging.LoggingSupportFactory; import org.apache.cassandra.utils.progress.ProgressEvent; import org.apache.cassandra.utils.progress.ProgressEventType; @@@ -145,10 -205,9 +224,10 @@@ import static com.google.common.collect import static com.google.common.collect.Iterables.tryFind; import static java.util.Arrays.asList; import static java.util.Arrays.stream; - import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.MILLISECONDS; + import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS; @@@ -159,11 -218,8 +238,12 @@@ import static org.apache.cassandra.inde import static org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily; import static org.apache.cassandra.net.NoPayload.noPayload; import static org.apache.cassandra.net.Verb.REPLICATION_DONE_REQ; - import static org.apache.cassandra.service.ActiveRepairService.*; -import static org.apache.cassandra.schema.MigrationManager.evolveSystemKeyspace; ++import static org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus; ++import static org.apache.cassandra.service.ActiveRepairService.repairCommandExecutor; +import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; +import static org.apache.cassandra.utils.Clock.Global.nanoTime; - import static org.apache.cassandra.utils.FBUtilities.now; import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; ++import static org.apache.cassandra.utils.FBUtilities.now; /** * This abstraction contains the token/identifier of this node @@@ -2386,7 -2309,15 +2467,15 @@@ public class StorageService extends Not public UUID getLocalHostUUID() { - return getTokenMetadata().getHostId(FBUtilities.getBroadcastAddressAndPort()); + UUID id = getTokenMetadata().getHostId(FBUtilities.getBroadcastAddressAndPort()); + if (id != null) + return id; + // this condition is to prevent accessing the tables when the node is not started yet, and in particular, + // when it is not going to be started at all (e.g. when running some unit tests or client tools). - else if (CommitLog.instance.isStarted()) ++ else if ((DatabaseDescriptor.isDaemonInitialized() || DatabaseDescriptor.isToolInitialized()) && CommitLog.instance.isStarted()) + return SystemKeyspace.getLocalHostId(); + + return null; } public Map<String, String> getHostIdMap() diff --cc src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java index b7164e8d7e,bad7b13150..c4bef35bfd mode 100644,100755..100644 --- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java +++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java @@@ -17,14 -17,8 +17,7 @@@ */ package org.apache.cassandra.tools; - import static org.apache.cassandra.tools.Util.BLUE; - import static org.apache.cassandra.tools.Util.CYAN; - import static org.apache.cassandra.tools.Util.RESET; - import static org.apache.cassandra.tools.Util.WHITE; - import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; - import static org.apache.commons.lang3.time.DurationFormatUtils.formatDurationWords; - import java.io.DataInputStream; -import java.io.File; import java.io.IOException; import java.io.PrintStream; import java.io.PrintWriter; @@@ -65,15 -67,12 +67,13 @@@ import org.apache.cassandra.schema.Tabl import org.apache.cassandra.tools.Util.TermHistogram; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; - import org.apache.commons.cli.CommandLine; - import org.apache.commons.cli.CommandLineParser; - import org.apache.commons.cli.HelpFormatter; - import org.apache.commons.cli.Option; - import org.apache.commons.cli.Options; - import org.apache.commons.cli.ParseException; - import org.apache.commons.cli.PosixParser; - import com.google.common.collect.MinMaxPriorityQueue; + import static org.apache.cassandra.tools.Util.BLUE; + import static org.apache.cassandra.tools.Util.CYAN; + import static org.apache.cassandra.tools.Util.RESET; + import static org.apache.cassandra.tools.Util.WHITE; ++import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; + import static org.apache.commons.lang3.time.DurationFormatUtils.formatDurationWords; /** * Shows the contents of sstable metadata @@@ -372,9 -371,10 +372,10 @@@ public class SSTableMetadataViewe field("maxClusteringValues", Arrays.toString(maxValues)); } field("Estimated droppable tombstones", - stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000) - this.gc)); + stats.getEstimatedDroppableTombstoneRatio((int) (currentTimeMillis() / 1000) - this.gc)); field("SSTable Level", stats.sstableLevel); field("Repaired at", stats.repairedAt, toDateString(stats.repairedAt, TimeUnit.MILLISECONDS)); + field("Originating host id", stats.originatingHostId); field("Pending repair", stats.pendingRepair); field("Replay positions covered", stats.commitLogIntervals); field("totalColumnsSet", stats.totalColumnsSet); diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 44c6f8bf99,b8833dd319..d4cb1cb9ed --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@@ -123,15 -117,9 +123,15 @@@ import org.apache.cassandra.service.Pen import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.StorageServiceMBean; +import org.apache.cassandra.service.paxos.PaxosRepair; +import org.apache.cassandra.service.paxos.PaxosState; ++import org.apache.cassandra.service.paxos.uncommitted.UncommittedTableData; +import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings; +import org.apache.cassandra.service.snapshot.SnapshotManager; +import org.apache.cassandra.streaming.StreamManager; - import org.apache.cassandra.service.paxos.uncommitted.UncommittedTableData; import org.apache.cassandra.streaming.StreamReceiveTask; import org.apache.cassandra.streaming.StreamTransferTask; -import org.apache.cassandra.streaming.async.StreamingInboundHandler; +import org.apache.cassandra.streaming.async.NettyStreamingChannel; import org.apache.cassandra.tools.NodeTool; import org.apache.cassandra.tools.Output; import org.apache.cassandra.tools.SystemExitException; @@@ -599,8 -512,8 +599,8 @@@ public class Instance extends IsolatedE // We need to persist this as soon as possible after startup checks. // This should be the first write to SystemKeyspace (CASSANDRA-11742) - SystemKeyspace.persistLocalMetadata(); + SystemKeyspace.persistLocalMetadata(config::hostId); - SystemKeyspaceMigrator40.migrate(); + SystemKeyspaceMigrator41.migrate(); // Same order to populate tokenMetadata for the first time, // see org.apache.cassandra.service.CassandraDaemon.setup @@@ -797,6 -754,9 +797,7 @@@ @Override public Future<Void> shutdown(boolean graceful) { - if (!graceful) - MessagingService.instance().shutdown(1L, MINUTES, false, true); - ++ inInstancelogger.info("Shutting down instance {} / {}", config.num(), config.broadcastAddress().getHostString()); Future<?> future = async((ExecutorService executor) -> { Throwable error = null; @@@ -810,21 -770,12 +811,26 @@@ } error = parallelRun(error, executor, StorageService.instance::disableAutoCompaction); + while (CompactionManager.instance.hasOngoingOrPendingTasks() && !Thread.currentThread().isInterrupted()) + { + inInstancelogger.info("Waiting for compactions to finish"); + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + } + // trigger init early or else it could try to init and touch a thread pool that got shutdown + HintsService hints = HintsService.instance; + ThrowingRunnable shutdownHints = () -> { + // this is to allow shutdown in the case hints were halted already + try + { + HintsService.instance.shutdownBlocking(); + } + catch (IllegalStateException e) + { + if (!"HintsService has already been shut down".equals(e.getMessage())) + throw e; + } + }; error = parallelRun(error, executor, () -> Gossiper.instance.stopShutdownAndWait(1L, MINUTES), CompactionManager.instance::forceShutdown, diff --cc test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java index a27daac0e6,037c221a37..92c56d6267 --- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java @@@ -28,10 -27,11 +28,9 @@@ import java.util.Map import java.util.TreeMap; import java.util.UUID; import java.util.function.Function; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.stream.Collectors; import com.vdurmont.semver4j.Semver; - import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.shared.NetworkTopology; diff --cc test/distributed/org/apache/cassandra/distributed/test/SSTableIdGenerationTest.java index 8d05f3dc71,0000000000..dd37bd3d60 mode 100644,000000..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/SSTableIdGenerationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/SSTableIdGenerationTest.java @@@ -1,531 -1,0 +1,531 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableSet; +import org.apache.commons.io.FileUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; +import org.apache.cassandra.db.compaction.DateTieredCompactionStrategy; +import org.apache.cassandra.db.compaction.LeveledCompactionStrategy; +import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy; +import org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SequenceBasedSSTableId; +import org.apache.cassandra.io.sstable.UUIDBasedSSTableId; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.metrics.RestorableMeter; +import org.apache.cassandra.tools.SystemExitException; +import org.apache.cassandra.utils.TimeUUID; +import org.assertj.core.api.Assertions; +import org.assertj.core.data.Offset; + +import static java.lang.String.format; +import static org.apache.cassandra.Util.bulkLoadSSTables; +import static org.apache.cassandra.Util.getBackups; +import static org.apache.cassandra.Util.getSSTables; +import static org.apache.cassandra.Util.getSnapshots; +import static org.apache.cassandra.Util.relativizePath; +import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; +import static org.apache.cassandra.db.SystemKeyspace.LEGACY_SSTABLE_ACTIVITY; +import static org.apache.cassandra.db.SystemKeyspace.SSTABLE_ACTIVITY_V2; +import static org.apache.cassandra.distributed.shared.FutureUtils.waitOn; +import static org.apache.cassandra.distributed.test.ExecUtil.rethrow; +import static org.assertj.core.api.Assertions.assertThat; + +public class SSTableIdGenerationTest extends TestBaseImpl +{ + private final static String ENABLE_UUID_FIELD_NAME = "uuid_sstable_identifiers_enabled"; + private final static String SNAPSHOT_TAG = "test"; + + private int v; + + private static SecurityManager originalSecurityManager; + + @BeforeClass + public static void beforeClass() throws Throwable + { + TestBaseImpl.beforeClass(); + + originalSecurityManager = System.getSecurityManager(); + // we prevent system exit and convert it to exception becuase this is one of the expected test outcomes, + // and we want to make an assertion on that + ClusterUtils.preventSystemExit(); + } + + @AfterClass + public static void afterClass() throws Throwable + { + System.setSecurityManager(originalSecurityManager); + } + + /** + * This test verifies that a node with uuid disabled actually creates sstables with sequential ids and + * both the current and legacy sstable activity tables are updated. + * Then, when enable uuid, we actually create sstables with uuid but keep and can read the old sstables. Also, only + * update the current sstable activity table. + */ + @Test + public void testRestartWithUUIDEnabled() throws IOException + { + try (Cluster cluster = init(Cluster.build(1) + .withDataDirCount(1) + .withConfig(config -> config.set(ENABLE_UUID_FIELD_NAME, false)) + .start())) + { + cluster.schemaChange(createTableStmt(KEYSPACE, "tbl", null)); + createSSTables(cluster.get(1), KEYSPACE, "tbl", 1, 2); + assertSSTablesCount(cluster.get(1), 2, 0, KEYSPACE, "tbl"); + verfiySSTableActivity(cluster, true); + + restartNode(cluster, 1, true); + + createSSTables(cluster.get(1), KEYSPACE, "tbl", 3, 4); - assertSSTablesCount(cluster.get(1), 2, 3, KEYSPACE, "tbl"); ++ assertSSTablesCount(cluster.get(1), 2, 2, KEYSPACE, "tbl"); + verfiySSTableActivity(cluster, false); + + checkRowsNumber(cluster.get(1), KEYSPACE, "tbl", 9); + } + } + + /** + * This test verifies that we should not be able to start a node with uuid disabled when there are uuid sstables + */ + @Test + public void testRestartWithUUIDDisabled() throws IOException + { + try (Cluster cluster = init(Cluster.build(1) + .withDataDirCount(1) + .withConfig(config -> config.set(ENABLE_UUID_FIELD_NAME, true)) + .start())) + { + cluster.disableAutoCompaction(KEYSPACE); + cluster.schemaChange(createTableStmt(KEYSPACE, "tbl", null)); + createSSTables(cluster.get(1), KEYSPACE, "tbl", 1, 2); + assertSSTablesCount(cluster.get(1), 0, 2, KEYSPACE, "tbl"); + verfiySSTableActivity(cluster, false); + + Assertions.assertThatExceptionOfType(RuntimeException.class) + .isThrownBy(() -> restartNode(cluster, 1, false)) + .withCauseInstanceOf(SystemExitException.class); + } + } + + @Test + public final void testCompactionStrategiesWithMixedSSTables() throws Exception + { + testCompactionStrategiesWithMixedSSTables(SizeTieredCompactionStrategy.class, + DateTieredCompactionStrategy.class, + TimeWindowCompactionStrategy.class, + LeveledCompactionStrategy.class); + } + + /** + * The purpose of this test is to verify that we can compact using the given strategy the mix of sstables created + * with sequential id and with uuid. Then we verify whether the number results matches the number of rows which we + * would get by merging data from the initial sstables. + */ + @SafeVarargs + private final void testCompactionStrategiesWithMixedSSTables(final Class<? extends AbstractCompactionStrategy>... compactionStrategyClasses) throws Exception + { + try (Cluster cluster = init(Cluster.build(1) + .withDataDirCount(1) + .withConfig(config -> config.set(ENABLE_UUID_FIELD_NAME, false)) + .start())) + { + // create a table and two sstables with sequential id for each strategy, the sstables will contain overlapping partitions + for (Class<? extends AbstractCompactionStrategy> compactionStrategyClass : compactionStrategyClasses) + { + String tableName = "tbl_" + compactionStrategyClass.getSimpleName().toLowerCase(); + cluster.schemaChange(createTableStmt(KEYSPACE, tableName, compactionStrategyClass)); + + createSSTables(cluster.get(1), KEYSPACE, tableName, 1, 2); + assertSSTablesCount(cluster.get(1), 2, 0, KEYSPACE, tableName); + } + + // restart the node with uuid enabled + restartNode(cluster, 1, true); + + // create another two sstables with uuid for each previously created table + for (Class<? extends AbstractCompactionStrategy> compactionStrategyClass : compactionStrategyClasses) + { + String tableName = "tbl_" + compactionStrategyClass.getSimpleName().toLowerCase(); + + createSSTables(cluster.get(1), KEYSPACE, tableName, 3, 4); + + // expect to have a mix of sstables with sequential id and uuid - assertSSTablesCount(cluster.get(1), 2, 3, KEYSPACE, tableName); ++ assertSSTablesCount(cluster.get(1), 2, 2, KEYSPACE, tableName); + + // after compaction, we expect to have a single sstable with uuid + cluster.get(1).forceCompact(KEYSPACE, tableName); + assertSSTablesCount(cluster.get(1), 0, 1, KEYSPACE, tableName); + + // verify the number of rows + checkRowsNumber(cluster.get(1), KEYSPACE, tableName, 9); + } + } + } + + @Test + public void testStreamingToNodeWithUUIDEnabled() throws Exception + { + testStreaming(true); + } + + @Test + public void testStreamingToNodeWithUUIDDisabled() throws Exception + { + testStreaming(false); + } + + /** + * The purpose of this test case is to verify the scenario when we need to stream mixed UUID and seq sstables to + * a node which have: 1) UUID disabled, and 2) UUID enabled; then verify that we can read all the data properly + * from that node alone. + */ + private void testStreaming(boolean uuidEnabledOnTargetNode) throws Exception + { + // start both nodes with uuid disabled + try (Cluster cluster = init(Cluster.build(2) + .withDataDirCount(1) + .withConfig(config -> config.set(ENABLE_UUID_FIELD_NAME, false).with(Feature.NETWORK)) + .start())) + { + // create an empty table and shutdown nodes 2, 3 + cluster.schemaChange(createTableStmt(KEYSPACE, "tbl", null)); + waitOn(cluster.get(2).shutdown()); + + // create 2 sstables with overlapping partitions on node 1 (with seq ids) + createSSTables(cluster.get(1), KEYSPACE, "tbl", 1, 2); + + // restart node 1 with uuid enabled + restartNode(cluster, 1, true); + + // create 2 sstables with overlapping partitions on node 1 (with UUID ids) + createSSTables(cluster.get(1), KEYSPACE, "tbl", 3, 4); + - assertSSTablesCount(cluster.get(1), 2, 3, KEYSPACE, "tbl"); ++ assertSSTablesCount(cluster.get(1), 2, 2, KEYSPACE, "tbl"); + + // now start node with UUID disabled and perform repair + cluster.get(2).config().set(ENABLE_UUID_FIELD_NAME, uuidEnabledOnTargetNode); + cluster.get(2).startup(); + + assertSSTablesCount(cluster.get(2), 0, 0, KEYSPACE, "tbl"); + + // at this point we have sstables with seq and uuid on nodes and no sstables on node - // when we run repair, we expect streaming all 5 sstables from node 1 to node 2 ++ // when we run repair, we expect streaming all 4 sstables from node 1 to node 2 + + cluster.get(2).nodetool("repair", KEYSPACE); + + if (uuidEnabledOnTargetNode) - assertSSTablesCount(cluster.get(2), 0, 5, KEYSPACE, "tbl"); ++ assertSSTablesCount(cluster.get(2), 0, 4, KEYSPACE, "tbl"); + else - assertSSTablesCount(cluster.get(2), 5, 0, KEYSPACE, "tbl"); ++ assertSSTablesCount(cluster.get(2), 4, 0, KEYSPACE, "tbl"); + + waitOn(cluster.get(1).shutdown()); + + checkRowsNumber(cluster.get(2), KEYSPACE, "tbl", 9); + } + } + + @Test + public void testSnapshot() throws Exception + { + File tmpDir = new File(Files.createTempDirectory("test")); + Set<String> seqOnlyBackupDirs; + Set<String> seqAndUUIDBackupDirs; + Set<String> uuidOnlyBackupDirs; + try (Cluster cluster = init(Cluster.build(1) + .withDataDirCount(1) + .withConfig(config -> config.with(Feature.NETWORK) + .set("incremental_backups", true) + .set("snapshot_before_compaction", false) + .set("auto_snapshot", false) + .set(ENABLE_UUID_FIELD_NAME, false)) + .start())) + { + // create the tables + + cluster.schemaChange("CREATE KEYSPACE new_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};"); + + cluster.schemaChange(createTableStmt(KEYSPACE, "tbl_seq_only", null)); + cluster.schemaChange(createTableStmt(KEYSPACE, "tbl_seq_and_uuid", null)); + cluster.schemaChange(createTableStmt(KEYSPACE, "tbl_uuid_only", null)); + cluster.schemaChange(createTableStmt("new_ks", "tbl_seq_only", null)); + cluster.schemaChange(createTableStmt("new_ks", "tbl_seq_and_uuid", null)); + cluster.schemaChange(createTableStmt("new_ks", "tbl_uuid_only", null)); + + // creating sstables + createSSTables(cluster.get(1), KEYSPACE, "tbl_seq_only", 1, 2, 3, 4); + createSSTables(cluster.get(1), KEYSPACE, "tbl_seq_and_uuid", 1, 2); + createSSTables(cluster.get(1), "new_ks", "tbl_seq_only", 5, 6, 7, 8); + createSSTables(cluster.get(1), "new_ks", "tbl_seq_and_uuid", 5, 6); + + restartNode(cluster, 1, true); + + createSSTables(cluster.get(1), KEYSPACE, "tbl_seq_and_uuid", 3, 4); + createSSTables(cluster.get(1), KEYSPACE, "tbl_uuid_only", 1, 2, 3, 4); + createSSTables(cluster.get(1), "new_ks", "tbl_seq_and_uuid", 7, 8); + createSSTables(cluster.get(1), "new_ks", "tbl_uuid_only", 5, 6, 7, 8); + + Set<String> seqOnlySnapshotDirs = snapshot(cluster.get(1), KEYSPACE, "tbl_seq_only"); + Set<String> seqAndUUIDSnapshotDirs = snapshot(cluster.get(1), KEYSPACE, "tbl_seq_and_uuid"); + Set<String> uuidOnlySnapshotDirs = snapshot(cluster.get(1), KEYSPACE, "tbl_uuid_only"); + + seqOnlyBackupDirs = getBackupDirs(cluster.get(1), KEYSPACE, "tbl_seq_only"); + seqAndUUIDBackupDirs = getBackupDirs(cluster.get(1), KEYSPACE, "tbl_seq_and_uuid"); + uuidOnlyBackupDirs = getBackupDirs(cluster.get(1), KEYSPACE, "tbl_uuid_only"); + + // at this point, we should have sstables with backups and snapshots for all tables - assertSSTablesCount(cluster.get(1), 4, 1, KEYSPACE, "tbl_seq_only"); - assertSSTablesCount(cluster.get(1), 2, 3, KEYSPACE, "tbl_seq_and_uuid"); ++ assertSSTablesCount(cluster.get(1), 4, 0, KEYSPACE, "tbl_seq_only"); ++ assertSSTablesCount(cluster.get(1), 2, 2, KEYSPACE, "tbl_seq_and_uuid"); + assertSSTablesCount(cluster.get(1), 0, 4, KEYSPACE, "tbl_uuid_only"); + - assertBackupSSTablesCount(cluster.get(1), 4, 1, KEYSPACE, "tbl_seq_only"); - assertBackupSSTablesCount(cluster.get(1), 2, 3, KEYSPACE, "tbl_seq_and_uuid"); ++ assertBackupSSTablesCount(cluster.get(1), 4, 0, KEYSPACE, "tbl_seq_only"); ++ assertBackupSSTablesCount(cluster.get(1), 2, 2, KEYSPACE, "tbl_seq_and_uuid"); + assertBackupSSTablesCount(cluster.get(1), 0, 4, KEYSPACE, "tbl_uuid_only"); + - assertSnapshotSSTablesCount(cluster.get(1), 4, 1, KEYSPACE, "tbl_seq_only"); - assertSnapshotSSTablesCount(cluster.get(1), 2, 3, KEYSPACE, "tbl_seq_and_uuid"); ++ assertSnapshotSSTablesCount(cluster.get(1), 4, 0, KEYSPACE, "tbl_seq_only"); ++ assertSnapshotSSTablesCount(cluster.get(1), 2, 2, KEYSPACE, "tbl_seq_and_uuid"); + assertSnapshotSSTablesCount(cluster.get(1), 0, 4, KEYSPACE, "tbl_uuid_only"); + + checkRowsNumber(cluster.get(1), KEYSPACE, "tbl_seq_only", 9); + checkRowsNumber(cluster.get(1), KEYSPACE, "tbl_seq_and_uuid", 9); + checkRowsNumber(cluster.get(1), KEYSPACE, "tbl_uuid_only", 9); + + // truncate the first set of tables + truncateAndAssertEmpty(cluster.get(1), KEYSPACE, "tbl_seq_only", "tbl_seq_and_uuid", "tbl_uuid_only"); + + restore(cluster.get(1), seqOnlySnapshotDirs, "tbl_seq_only", 9); + restore(cluster.get(1), seqAndUUIDSnapshotDirs, "tbl_seq_and_uuid", 9); + restore(cluster.get(1), uuidOnlySnapshotDirs, "tbl_uuid_only", 9); + + truncateAndAssertEmpty(cluster.get(1), KEYSPACE, "tbl_seq_only", "tbl_seq_and_uuid", "tbl_uuid_only"); + + restore(cluster.get(1), seqOnlyBackupDirs, "tbl_seq_only", 9); + restore(cluster.get(1), seqAndUUIDBackupDirs, "tbl_seq_and_uuid", 9); + restore(cluster.get(1), uuidOnlyBackupDirs, "tbl_uuid_only", 9); + + ImmutableSet<String> allBackupDirs = ImmutableSet.<String>builder().addAll(seqOnlyBackupDirs).addAll(seqAndUUIDBackupDirs).addAll(uuidOnlyBackupDirs).build(); + cluster.get(1).runOnInstance(rethrow(() -> allBackupDirs.forEach(dir -> bulkLoadSSTables(new File(dir), "new_ks")))); + + checkRowsNumber(cluster.get(1), "new_ks", "tbl_seq_only", 17); + checkRowsNumber(cluster.get(1), "new_ks", "tbl_seq_and_uuid", 17); + checkRowsNumber(cluster.get(1), "new_ks", "tbl_uuid_only", 17); + + + for (String dir : allBackupDirs) + { + File src = new File(dir); + File dest = relativizePath(tmpDir, src, 3); + Files.createDirectories(dest.parent().toPath()); + FileUtils.moveDirectory(src.toJavaIOFile(), dest.toJavaIOFile()); + } + } + + try (Cluster cluster = init(Cluster.build(1) + .withDataDirCount(1) + .withConfig(config -> config.with(Feature.NETWORK, Feature.NATIVE_PROTOCOL) + .set("incremental_backups", true) + .set("snapshot_before_compaction", false) + .set("auto_snapshot", false) + .set(ENABLE_UUID_FIELD_NAME, false)) + .start())) + { + cluster.schemaChange(createTableStmt(KEYSPACE, "tbl_seq_only", null)); + cluster.schemaChange(createTableStmt(KEYSPACE, "tbl_seq_and_uuid", null)); + cluster.schemaChange(createTableStmt(KEYSPACE, "tbl_uuid_only", null)); + + Function<String, String> relativeToTmpDir = d -> relativizePath(tmpDir, new File(d), 3).toString(); + restore(cluster.get(1), seqOnlyBackupDirs.stream().map(relativeToTmpDir).collect(Collectors.toSet()), "tbl_seq_only", 9); + restore(cluster.get(1), seqAndUUIDBackupDirs.stream().map(relativeToTmpDir).collect(Collectors.toSet()), "tbl_seq_and_uuid", 9); + restore(cluster.get(1), uuidOnlyBackupDirs.stream().map(relativeToTmpDir).collect(Collectors.toSet()), "tbl_uuid_only", 9); + } + } + + private static void restore(IInvokableInstance instance, Set<String> dirs, String targetTableName, int expectedRowsNum) + { + List<String> failedImports = instance.callOnInstance(() -> ColumnFamilyStore.getIfExists(KEYSPACE, targetTableName) + .importNewSSTables(dirs, false, false, true, true, true, true, true)); + assertThat(failedImports).isEmpty(); + checkRowsNumber(instance, KEYSPACE, targetTableName, expectedRowsNum); + } + + private static void truncateAndAssertEmpty(IInvokableInstance instance, String ks, String... tableNames) + { + for (String tableName : tableNames) + { + instance.executeInternal(format("TRUNCATE %s.%s", ks, tableName)); + assertSSTablesCount(instance, 0, 0, ks, tableName); + checkRowsNumber(instance, ks, tableName, 0); + } + } + + private static Set<String> snapshot(IInvokableInstance instance, String ks, String tableName) + { + Set<String> snapshotDirs = instance.callOnInstance(() -> ColumnFamilyStore.getIfExists(ks, tableName) + .snapshot(SNAPSHOT_TAG) + .getDirectories() + .stream() + .map(File::toString) + .collect(Collectors.toSet())); + assertThat(snapshotDirs).isNotEmpty(); + return snapshotDirs; + } + + private static String createTableStmt(String ks, String name, Class<? extends AbstractCompactionStrategy> compactionStrategy) + { + if (compactionStrategy == null) + compactionStrategy = SizeTieredCompactionStrategy.class; + return format("CREATE TABLE %s.%s (pk int, ck int, v int, PRIMARY KEY (pk, ck)) " + + "WITH compaction = {'class':'%s', 'enabled':'false'}", + ks, name, compactionStrategy.getCanonicalName()); + } + + private void createSSTables(IInstance instance, String ks, String tableName, int... records) + { + String insert = format("INSERT INTO %s.%s (pk, ck, v) VALUES (?, ?, ?)", ks, tableName); + for (int record : records) + { + instance.executeInternal(insert, record, record, ++v); + instance.executeInternal(insert, record, record + 1, ++v); + instance.executeInternal(insert, record + 1, record + 1, ++v); + instance.flush(ks); + } + } + + private static void assertSSTablesCount(Set<Descriptor> descs, String tableName, int expectedSeqGenIds, int expectedUUIDGenIds) + { + List<String> seqSSTables = descs.stream().filter(desc -> desc.id instanceof SequenceBasedSSTableId).map(Descriptor::baseFilename).sorted().collect(Collectors.toList()); + List<String> uuidSSTables = descs.stream().filter(desc -> desc.id instanceof UUIDBasedSSTableId).map(Descriptor::baseFilename).sorted().collect(Collectors.toList()); + assertThat(seqSSTables).describedAs("SSTables of %s with sequence based id", tableName).hasSize(expectedSeqGenIds); + assertThat(uuidSSTables).describedAs("SSTables of %s with UUID based id", tableName).hasSize(expectedUUIDGenIds); + } + + private static void assertSSTablesCount(IInvokableInstance instance, int expectedSeqGenIds, int expectedUUIDGenIds, String ks, String... tableNames) + { + instance.runOnInstance(rethrow(() -> Arrays.stream(tableNames).forEach(tableName -> assertSSTablesCount(getSSTables(ks, tableName), tableName, expectedSeqGenIds, expectedUUIDGenIds)))); + } + + private static void assertSnapshotSSTablesCount(IInvokableInstance instance, int expectedSeqGenIds, int expectedUUIDGenIds, String ks, String... tableNames) + { + instance.runOnInstance(rethrow(() -> Arrays.stream(tableNames).forEach(tableName -> assertSSTablesCount(getSnapshots(ks, tableName, SNAPSHOT_TAG), tableName, expectedSeqGenIds, expectedUUIDGenIds)))); + } + + private static void assertBackupSSTablesCount(IInvokableInstance instance, int expectedSeqGenIds, int expectedUUIDGenIds, String ks, String... tableNames) + { + instance.runOnInstance(rethrow(() -> Arrays.stream(tableNames).forEach(tableName -> assertSSTablesCount(getBackups(ks, tableName), tableName, expectedSeqGenIds, expectedUUIDGenIds)))); + } + + private static Set<String> getBackupDirs(IInvokableInstance instance, String ks, String tableName) + { + return instance.callOnInstance(() -> getBackups(ks, tableName).stream() + .map(d -> d.directory) + .map(File::toString) + .collect(Collectors.toSet())); + } + + private static void verfiySSTableActivity(Cluster cluster, boolean expectLegacyTableIsPopulated) + { + cluster.get(1).runOnInstance(() -> { + RestorableMeter meter = new RestorableMeter(15, 120); + SequenceBasedSSTableId seqGenId = new SequenceBasedSSTableId(1); + SystemKeyspace.persistSSTableReadMeter("ks", "tab", seqGenId, meter); + assertThat(SystemKeyspace.getSSTableReadMeter("ks", "tab", seqGenId)).matches(m -> m.fifteenMinuteRate() == meter.fifteenMinuteRate() + && m.twoHourRate() == meter.twoHourRate()); + + checkSSTableActivityRow(SSTABLE_ACTIVITY_V2, seqGenId.toString(), true); + if (expectLegacyTableIsPopulated) + checkSSTableActivityRow(LEGACY_SSTABLE_ACTIVITY, seqGenId.generation, true); + + SystemKeyspace.clearSSTableReadMeter("ks", "tab", seqGenId); + + checkSSTableActivityRow(SSTABLE_ACTIVITY_V2, seqGenId.toString(), false); + if (expectLegacyTableIsPopulated) + checkSSTableActivityRow(LEGACY_SSTABLE_ACTIVITY, seqGenId.generation, false); + + UUIDBasedSSTableId uuidGenId = new UUIDBasedSSTableId(TimeUUID.Generator.nextTimeUUID()); + SystemKeyspace.persistSSTableReadMeter("ks", "tab", uuidGenId, meter); + assertThat(SystemKeyspace.getSSTableReadMeter("ks", "tab", uuidGenId)).matches(m -> m.fifteenMinuteRate() == meter.fifteenMinuteRate() + && m.twoHourRate() == meter.twoHourRate()); + + checkSSTableActivityRow(SSTABLE_ACTIVITY_V2, uuidGenId.toString(), true); + + SystemKeyspace.clearSSTableReadMeter("ks", "tab", uuidGenId); + + checkSSTableActivityRow(SSTABLE_ACTIVITY_V2, uuidGenId.toString(), false); + }); + } + + private static void checkSSTableActivityRow(String table, Object genId, boolean expectExists) + { + String tableColName = SSTABLE_ACTIVITY_V2.equals(table) ? "table_name" : "columnfamily_name"; + String idColName = SSTABLE_ACTIVITY_V2.equals(table) ? "id" : "generation"; + String cql = "SELECT rate_15m, rate_120m FROM system.%s WHERE keyspace_name=? and %s=? and %s=?"; + UntypedResultSet results = executeInternal(format(cql, table, tableColName, idColName), "ks", "tab", genId); + assertThat(results).isNotNull(); + + if (expectExists) + { + assertThat(results.isEmpty()).isFalse(); + UntypedResultSet.Row row = results.one(); + assertThat(row.getDouble("rate_15m")).isEqualTo(15d, Offset.offset(0.001d)); + assertThat(row.getDouble("rate_120m")).isEqualTo(120d, Offset.offset(0.001d)); + } + else + { + assertThat(results.isEmpty()).isTrue(); + } + } + + private static void restartNode(Cluster cluster, int node, boolean uuidEnabled) + { + waitOn(cluster.get(node).shutdown()); + cluster.get(node).config().set(ENABLE_UUID_FIELD_NAME, uuidEnabled); + cluster.get(node).startup(); + } + + private static void checkRowsNumber(IInstance instance, String ks, String tableName, int expectedNumber) + { + SimpleQueryResult result = instance.executeInternalWithResult(format("SELECT * FROM %s.%s", ks, tableName)); + Object[][] rows = result.toObjectArrays(); + assertThat(rows).withFailMessage("Invalid results for %s.%s - should have %d rows but has %d: \n%s", ks, tableName, expectedNumber, + rows.length, result.toString()).hasSize(expectedNumber); + } +} diff --cc test/unit/org/apache/cassandra/db/KeyCacheTest.java index c673458d86,445fc67c00..475f1a162f --- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java +++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java @@@ -350,10 -348,9 +350,9 @@@ public class KeyCacheTes CacheService.instance.keyCache.loadSaved(); - // Here max time to load cache is negative which means no time left to load cache. So the keyCache size should + // Here max time to load cache is zero which means no time left to load cache. So the keyCache size should // be zero after loadSaved(). assertKeyCacheSize(0, KEYSPACE1, cf); - assertEquals(0, CacheService.instance.keyCache.size()); } @Test diff --cc test/unit/org/apache/cassandra/db/compaction/NeverPurgeTest.java index 089ede818e,a5388ca037..42537319ff --- a/test/unit/org/apache/cassandra/db/compaction/NeverPurgeTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/NeverPurgeTest.java @@@ -20,9 -20,9 +20,10 @@@ package org.apache.cassandra.db.compact import java.util.Collection; + import org.junit.BeforeClass; import org.junit.Test; +import org.apache.cassandra.Util; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org