This is an automated email from the ASF dual-hosted git repository.

jlewandowski pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 14936d0bd3716ed251e799a264f5ab16d51b893b
Merge: 780f8b94ff 4b9c18235a
Author: Jacek Lewandowski <lewandowski.ja...@gmail.com>
AuthorDate: Fri Mar 31 17:30:20 2023 +0200

    Merge branch 'cassandra-4.0' into cassandra-4.1
    
    * cassandra-4.0:
      Save host id to system.local and flush immediately after startup

 CHANGES.txt                                        |   5 +-
 .../org/apache/cassandra/db/SystemKeyspace.java    |  23 ++-
 .../apache/cassandra/db/commitlog/CommitLog.java   |  47 ++++--
 .../cassandra/db/compaction/CompactionManager.java |  76 +++++++--
 .../apache/cassandra/service/StorageService.java   | 173 ++++++++++++++++-----
 .../cassandra/tools/SSTableMetadataViewer.java     |  31 ++--
 .../cassandra/distributed/impl/Instance.java       |  12 +-
 .../cassandra/distributed/impl/InstanceConfig.java |   4 +-
 .../distributed/test/IPMembershipTest.java         |   4 +
 .../distributed/test/SSTableIdGenerationTest.java  |  24 +--
 .../unit/org/apache/cassandra/db/KeyCacheTest.java |   2 -
 .../cassandra/db/compaction/NeverPurgeTest.java    |   8 +-
 12 files changed, 302 insertions(+), 107 deletions(-)

diff --cc CHANGES.txt
index dec7f680f2,675d423080..c12983568e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,21 -1,4 +1,24 @@@
 -4.0.9
 +4.1.2
 + * Return snapshots with dots in their name in nodetool listsnapshots 
(CASSANDRA-18371)
 + * Fix NPE when loading snapshots and data directory is one directory from 
root (CASSANDRA-18359)
 + * Do not submit hints when hinted_handoff_enabled=false (CASSANDRA-18304)
 + * Fix COPY ... TO STDOUT behavior in cqlsh (CASSANDRA-18353)
 + * Remove six and Py2SaferScanner merge cruft (CASSANDRA-18354)
-  
++Merged from 4.0:
++Merged from 3.11:
++Merged from 3.0: 
++ * Save host id to system.local and flush immediately after startup 
(CASSANDRA-18153)
 +
 +4.1.1
 + * Deprecate org.apache.cassandra.hadoop code (CASSANDRA-16984)
 + * Fix too early schema version change in sysem local table (CASSANDRA-18291)
 + * Fix copying of JAR of a trigger to temporary file (CASSANDRA-18264)
 + * Fix possible NoSuchFileException when removing a snapshot (CASSANDRA-18211)
 + * PaxosPrepare may add instances to the Electorate that are not in gossip 
(CASSANDRA-18194)
 + * Fix PAXOS2_COMMIT_AND_PREPARE_RSP serialisation AssertionError 
(CASSANDRA-18164)
 + * Streaming progress virtual table lock contention can trigger 
TCP_USER_TIMEOUT and fail streaming (CASSANDRA-18110)
 + * Fix perpetual load of denylist on read in cases where denylist can never 
be loaded (CASSANDRA-18116)
 +Merged from 4.0:
   * Fix BufferPool incorrect memoryInUse when putUnusedPortion is used 
(CASSANDRA-18311)
   * Improve memtable allocator accounting when updating AtomicBTreePartition 
(CASSANDRA-18125)
   * Update zstd-jni to version 1.5.4-1 (CASSANDRA-18259)
diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java
index 1523720d53,d63ee77736..fd2145b30c
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@@ -559,7 -498,18 +561,13 @@@ public final class SystemKeyspac
          DECOMMISSIONED
      }
  
 -    public static void finishStartup()
 -    {
 -        Schema.instance.saveSystemKeyspace();
 -    }
 -
      public static void persistLocalMetadata()
+     {
+         persistLocalMetadata(UUID::randomUUID);
+     }
+ 
+     @VisibleForTesting
+     public static void persistLocalMetadata(Supplier<UUID> nodeIdSupplier)
      {
          String req = "INSERT INTO system.%s (" +
                       "key," +
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index a832b5ea42,49eb67b1df..6195b1b4ca
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@@ -17,19 -17,23 +17,25 @@@
   */
  package org.apache.cassandra.db.commitlog;
  
- 
 -import java.io.File;
 -import java.io.FilenameFilter;
  import java.io.IOException;
  import java.nio.ByteBuffer;
 +import java.nio.file.FileStore;
- import java.util.*;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.TreeMap;
+ import java.util.UUID;
 +import java.util.concurrent.TimeUnit;
 +import java.util.function.BiPredicate;
  import java.util.function.Function;
  import java.util.zip.CRC32;
  
  import com.google.common.annotations.VisibleForTesting;
- import org.apache.cassandra.io.util.File;
 +import com.google.common.base.Preconditions;
  import org.apache.commons.lang3.StringUtils;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -43,7 -47,7 +49,8 @@@ import org.apache.cassandra.io.compress
  import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
  import org.apache.cassandra.io.util.DataOutputBuffer;
  import org.apache.cassandra.io.util.DataOutputBufferFixed;
 -import org.apache.cassandra.io.util.FileUtils;
++import org.apache.cassandra.io.util.File;
 +import org.apache.cassandra.io.util.PathUtils;
  import org.apache.cassandra.metrics.CommitLogMetrics;
  import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.schema.CompressionParams;
@@@ -52,10 -56,9 +59,9 @@@ import org.apache.cassandra.security.En
  import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.utils.JVMStabilityInspector;
  import org.apache.cassandra.utils.MBeanWrapper;
 +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
  
  import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
--import static 
org.apache.cassandra.db.commitlog.CommitLogSegment.CommitLogSegmentFileComparator;
  import static 
org.apache.cassandra.db.commitlog.CommitLogSegment.ENTRY_OVERHEAD_SIZE;
  import static org.apache.cassandra.utils.FBUtilities.updateChecksum;
  import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
@@@ -70,6 -73,8 +76,8 @@@ public class CommitLog implements Commi
  
      public static final CommitLog instance = CommitLog.construct();
  
 -    private static final FilenameFilter unmanagedFilesFilter = (dir, name) -> 
CommitLogDescriptor.isValid(name) && CommitLogSegment.shouldReplay(name);
++    private static final BiPredicate<File, String> unmanagedFilesFilter = 
(dir, name) -> CommitLogDescriptor.isValid(name) && 
CommitLogSegment.shouldReplay(name);
+ 
      final public AbstractCommitLogSegmentManager segmentManager;
  
      public final CommitLogArchiver archiver;
@@@ -82,7 -87,7 +90,6 @@@
      private static CommitLog construct()
      {
          CommitLog log = new CommitLog(CommitLogArchiver.construct(), 
DatabaseDescriptor.getCommitLogSegmentMgrProvider());
--
          MBeanWrapper.instance.registerMBean(log, 
"org.apache.cassandra.db:type=Commitlog");
          return log;
      }
@@@ -145,6 -150,24 +152,24 @@@
          return this;
      }
  
+     public boolean isStarted()
+     {
+         return started;
+     }
+ 
+     public boolean hasFilesToReplay()
+     {
+         return getUnmanagedFiles().length > 0;
+     }
+ 
+     private File[] getUnmanagedFiles()
+     {
 -        File[] files = new 
File(segmentManager.storageDirectory).listFiles(unmanagedFilesFilter);
++        File[] files = new 
File(segmentManager.storageDirectory).tryList(unmanagedFilesFilter);
+         if (files == null)
+             return new File[0];
+         return files;
+     }
+ 
      /**
       * Perform recovery on commit logs located in the directory specified by 
the config file.
       *
@@@ -156,12 -179,10 +181,10 @@@
          // submit all files for this segment manager for archiving prior to 
recovery - CASSANDRA-6904
          // The files may have already been archived by normal CommitLog 
operation. This may cause errors in this
          // archiving pass, which we should not treat as serious.
-         for (File file : new 
File(segmentManager.storageDirectory).tryList(unmanagedFilesFilter))
+         for (File file : getUnmanagedFiles())
          {
 -            archiver.maybeArchive(file.getPath(), file.getName());
 -            archiver.maybeWaitForArchiving(file.getName());
 +            archiver.maybeArchive(file.path(), file.name());
 +            archiver.maybeWaitForArchiving(file.name());
          }
  
          assert archiver.archivePending.isEmpty() : "Not all commit log 
archive tasks were completed before restore";
@@@ -176,7 -197,7 +199,7 @@@
          }
          else
          {
--            Arrays.sort(files, new CommitLogSegmentFileComparator());
++            Arrays.sort(files, new 
CommitLogSegment.CommitLogSegmentFileComparator());
              logger.info("Replaying {}", StringUtils.join(files, ", "));
              replayed = recoverFiles(files);
              logger.info("Log replay complete, {} replayed mutations", 
replayed);
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index bb2d89cda7,b53f3319fa..347f9b3ed0
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -17,8 -17,20 +17,17 @@@
   */
  package org.apache.cassandra.db.compaction;
  
 -import java.io.File;
  import java.io.IOException;
- import java.util.*;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
 -import java.util.UUID;
 -import java.util.concurrent.BlockingQueue;
  import java.util.concurrent.Callable;
  import java.util.concurrent.ExecutionException;
  import java.util.concurrent.ExecutorService;
@@@ -34,25 -48,36 +43,31 @@@ import javax.management.openmbean.Tabul
  
  import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Preconditions;
- import com.google.common.collect.*;
+ import com.google.common.collect.ArrayListMultimap;
+ import com.google.common.collect.Collections2;
+ import com.google.common.collect.ConcurrentHashMultiset;
+ import com.google.common.collect.Iterables;
+ import com.google.common.collect.Lists;
+ import com.google.common.collect.Maps;
+ import com.google.common.collect.Multimap;
+ import com.google.common.collect.Multiset;
+ import com.google.common.collect.Sets;
 -import com.google.common.util.concurrent.Futures;
 -import com.google.common.util.concurrent.ListenableFuture;
 -import com.google.common.util.concurrent.ListenableFutureTask;
  import com.google.common.util.concurrent.RateLimiter;
  import com.google.common.util.concurrent.Uninterruptibles;
- 
- import org.apache.cassandra.concurrent.ExecutorFactory;
- import org.apache.cassandra.concurrent.WrappedExecutorPlus;
- import org.apache.cassandra.dht.AbstractBounds;
- import org.apache.cassandra.io.util.File;
- import org.apache.cassandra.locator.RangesAtEndpoint;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import io.netty.util.concurrent.FastThreadLocal;
  import org.apache.cassandra.cache.AutoSavingCache;
- import org.apache.cassandra.exceptions.ConfigurationException;
- import org.apache.cassandra.repair.NoSuchRepairSessionException;
- import org.apache.cassandra.schema.TableMetadata;
 -import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 -import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 -import org.apache.cassandra.concurrent.NamedThreadFactory;
++import org.apache.cassandra.concurrent.ExecutorFactory;
++import org.apache.cassandra.concurrent.WrappedExecutorPlus;
  import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.schema.Schema;
- import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.DecoratedKey;
+ import org.apache.cassandra.db.Directories;
+ import org.apache.cassandra.db.DiskBoundaries;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.SerializationHeader;
+ import org.apache.cassandra.db.SystemKeyspace;
  import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
  import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
  import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@@ -65,8 -91,8 +81,9 @@@ import org.apache.cassandra.dht.Abstrac
  import org.apache.cassandra.dht.Bounds;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
+ import org.apache.cassandra.exceptions.ConfigurationException;
  import org.apache.cassandra.index.SecondaryIndexBuilder;
 +import org.apache.cassandra.io.sstable.Component;
  import org.apache.cassandra.io.sstable.Descriptor;
  import org.apache.cassandra.io.sstable.ISSTableScanner;
  import org.apache.cassandra.io.sstable.IndexSummaryRedistribution;
@@@ -75,16 -102,22 +92,26 @@@ import org.apache.cassandra.io.sstable.
  import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
  import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
++import org.apache.cassandra.io.util.File;
  import org.apache.cassandra.io.util.FileUtils;
+ import org.apache.cassandra.locator.RangesAtEndpoint;
  import org.apache.cassandra.metrics.CompactionMetrics;
  import org.apache.cassandra.metrics.TableMetrics;
++import org.apache.cassandra.repair.NoSuchRepairSessionException;
  import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
+ import org.apache.cassandra.schema.Schema;
+ import org.apache.cassandra.schema.TableMetadata;
  import org.apache.cassandra.service.ActiveRepairService;
  import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.streaming.PreviewKind;
- import org.apache.cassandra.utils.*;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.JVMStabilityInspector;
+ import org.apache.cassandra.utils.MBeanWrapper;
+ import org.apache.cassandra.utils.Throwables;
 -import org.apache.cassandra.utils.UUIDGen;
++import org.apache.cassandra.utils.TimeUUID;
+ import org.apache.cassandra.utils.WrappedRunnable;
 +import org.apache.cassandra.utils.concurrent.Future;
 +import org.apache.cassandra.utils.concurrent.ImmediateFuture;
  import org.apache.cassandra.utils.concurrent.Refs;
  
  import static java.util.Collections.singleton;
@@@ -233,6 -259,26 +260,27 @@@ public class CompactionManager implemen
          return false;
      }
  
+     @VisibleForTesting
+     public boolean hasOngoingOrPendingTasks()
+     {
+         if (!active.getCompactions().isEmpty() || !compactingCF.isEmpty())
+             return true;
+ 
+         int pendingTasks = executor.getPendingTaskCount() +
+                            validationExecutor.getPendingTaskCount() +
+                            viewBuildExecutor.getPendingTaskCount() +
+                            cacheCleanupExecutor.getPendingTaskCount();
+         if (pendingTasks > 0)
+             return true;
+ 
+         int activeTasks = executor.getActiveTaskCount() +
+                           validationExecutor.getActiveTaskCount() +
+                           viewBuildExecutor.getActiveTaskCount() +
+                           cacheCleanupExecutor.getActiveTaskCount();
+ 
+         return activeTasks > 0;
+     }
++
      /**
       * Shutdowns both compaction and validation executors, cancels running 
compaction / validation,
       * and waits for tasks to complete if tasks were not cancelable.
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 6d0d24eb94,e7c6fabc75..92c31c1f7a
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -23,12 -22,29 +23,29 @@@ import java.io.DataInputStream
  import java.io.IOError;
  import java.io.IOException;
  import java.net.InetAddress;
 +import java.net.InetSocketAddress;
  import java.net.UnknownHostException;
  import java.nio.ByteBuffer;
 -import java.nio.file.Paths;
 +import java.time.Instant;
- import java.util.*;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.EnumMap;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.LinkedHashMap;
+ import java.util.List;
+ import java.util.Map;
  import java.util.Map.Entry;
+ import java.util.Optional;
+ import java.util.Scanner;
+ import java.util.Set;
+ import java.util.SortedMap;
+ import java.util.TreeMap;
+ import java.util.TreeSet;
+ import java.util.UUID;
  import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.CopyOnWriteArrayList;
  import java.util.concurrent.ExecutionException;
@@@ -56,44 -71,46 +72,48 @@@ import com.google.common.base.Joiner
  import com.google.common.base.Preconditions;
  import com.google.common.base.Predicate;
  import com.google.common.base.Predicates;
- import com.google.common.collect.*;
- import com.google.common.util.concurrent.*;
- 
- import org.apache.cassandra.config.CassandraRelevantProperties;
- import org.apache.cassandra.concurrent.*;
- import org.apache.cassandra.config.DataStorageSpec;
- import org.apache.cassandra.cql3.QueryHandler;
- import org.apache.cassandra.dht.RangeStreamer.FetchReplica;
- import org.apache.cassandra.fql.FullQueryLogger;
- import org.apache.cassandra.fql.FullQueryLoggerOptions;
- import org.apache.cassandra.fql.FullQueryLoggerOptionsCompositeData;
- import org.apache.cassandra.io.util.File;
- import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict;
- import org.apache.cassandra.schema.Keyspaces;
- import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
- import org.apache.cassandra.utils.concurrent.Future;
- import org.apache.cassandra.schema.TableId;
- import org.apache.cassandra.utils.concurrent.FutureCombiner;
- import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+ import com.google.common.collect.HashMultimap;
+ import com.google.common.collect.ImmutableList;
++import com.google.common.collect.ImmutableMap;
+ import com.google.common.collect.Iterables;
+ import com.google.common.collect.Lists;
+ import com.google.common.collect.Maps;
+ import com.google.common.collect.Multimap;
+ import com.google.common.collect.Ordering;
+ import com.google.common.collect.Sets;
+ import com.google.common.util.concurrent.FutureCallback;
 -import com.google.common.util.concurrent.Futures;
 -import com.google.common.util.concurrent.ListenableFuture;
 -import com.google.common.util.concurrent.MoreExecutors;
+ import com.google.common.util.concurrent.RateLimiter;
+ import com.google.common.util.concurrent.Uninterruptibles;
  import org.apache.commons.lang3.StringUtils;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
  import org.apache.cassandra.audit.AuditLogManager;
  import org.apache.cassandra.audit.AuditLogOptions;
 +import org.apache.cassandra.auth.AuthCacheService;
- import org.apache.cassandra.config.Config.PaxosStatePurging;
- import org.apache.cassandra.service.paxos.*;
- import org.apache.cassandra.service.paxos.cleanup.*;
- import org.apache.cassandra.utils.progress.ProgressListener;
  import org.apache.cassandra.auth.AuthKeyspace;
  import org.apache.cassandra.auth.AuthSchemaChangeListener;
  import org.apache.cassandra.batchlog.BatchlogManager;
+ import org.apache.cassandra.concurrent.ExecutorLocals;
++import org.apache.cassandra.concurrent.FutureTask;
++import org.apache.cassandra.concurrent.FutureTaskWithResources;
+ import org.apache.cassandra.concurrent.NamedThreadFactory;
+ import org.apache.cassandra.concurrent.ScheduledExecutors;
+ import org.apache.cassandra.concurrent.Stage;
++import org.apache.cassandra.config.CassandraRelevantProperties;
  import org.apache.cassandra.config.Config;
++import org.apache.cassandra.config.Config.PaxosStatePurging;
++import org.apache.cassandra.config.DataStorageSpec;
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.ParameterizedClass;
 +import org.apache.cassandra.config.DurationSpec;
+ import org.apache.cassandra.cql3.QueryHandler;
  import org.apache.cassandra.cql3.QueryProcessor;
- import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.DecoratedKey;
 -import org.apache.cassandra.db.Directories;
+ import org.apache.cassandra.db.Keyspace;
 -import org.apache.cassandra.db.Mutation;
+ import org.apache.cassandra.db.SizeEstimatesRecorder;
+ import org.apache.cassandra.db.SnapshotDetailsTabularData;
+ import org.apache.cassandra.db.SystemKeyspace;
  import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.db.compaction.CompactionManager;
  import org.apache.cassandra.db.compaction.Verifier;
@@@ -108,33 -142,58 +145,74 @@@ import org.apache.cassandra.hints.Hints
  import org.apache.cassandra.io.sstable.SSTableLoader;
  import org.apache.cassandra.io.sstable.format.SSTableFormat;
  import org.apache.cassandra.io.sstable.format.VersionAndType;
++import org.apache.cassandra.io.util.File;
  import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.io.util.PathUtils;
- import org.apache.cassandra.locator.*;
+ import org.apache.cassandra.locator.AbstractReplicationStrategy;
+ import org.apache.cassandra.locator.DynamicEndpointSnitch;
+ import org.apache.cassandra.locator.EndpointsByRange;
+ import org.apache.cassandra.locator.EndpointsByReplica;
+ import org.apache.cassandra.locator.EndpointsForRange;
+ import org.apache.cassandra.locator.EndpointsForToken;
+ import org.apache.cassandra.locator.IEndpointSnitch;
+ import org.apache.cassandra.locator.InetAddressAndPort;
+ import org.apache.cassandra.locator.LocalStrategy;
+ import org.apache.cassandra.locator.NetworkTopologyStrategy;
+ import org.apache.cassandra.locator.RangesAtEndpoint;
+ import org.apache.cassandra.locator.RangesByEndpoint;
+ import org.apache.cassandra.locator.Replica;
+ import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict;
+ import org.apache.cassandra.locator.Replicas;
+ import org.apache.cassandra.locator.SystemReplicas;
+ import org.apache.cassandra.locator.TokenMetadata;
  import org.apache.cassandra.metrics.StorageMetrics;
- import org.apache.cassandra.net.*;
- import org.apache.cassandra.repair.*;
+ import org.apache.cassandra.net.AsyncOneResponse;
+ import org.apache.cassandra.net.Message;
+ import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.repair.RepairRunnable;
 -import org.apache.cassandra.repair.SystemDistributedKeyspace;
  import org.apache.cassandra.repair.messages.RepairOption;
  import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
  import org.apache.cassandra.schema.KeyspaceMetadata;
 -import org.apache.cassandra.schema.MigrationCoordinator;
 -import org.apache.cassandra.schema.MigrationManager;
++import org.apache.cassandra.schema.Keyspaces;
  import org.apache.cassandra.schema.ReplicationParams;
  import org.apache.cassandra.schema.Schema;
  import org.apache.cassandra.schema.SchemaConstants;
 +import org.apache.cassandra.schema.SchemaTransformations;
 +import org.apache.cassandra.schema.SystemDistributedKeyspace;
++import org.apache.cassandra.schema.TableId;
  import org.apache.cassandra.schema.TableMetadata;
  import org.apache.cassandra.schema.TableMetadataRef;
  import org.apache.cassandra.schema.ViewMetadata;
++import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
++import org.apache.cassandra.service.paxos.Paxos;
++import org.apache.cassandra.service.paxos.PaxosCommit;
++import org.apache.cassandra.service.paxos.PaxosRepair;
++import org.apache.cassandra.service.paxos.PaxosState;
++import 
org.apache.cassandra.service.paxos.cleanup.PaxosCleanupLocalCoordinator;
++import org.apache.cassandra.service.paxos.cleanup.PaxosTableRepairs;
 +import org.apache.cassandra.service.snapshot.SnapshotManager;
 +import org.apache.cassandra.service.snapshot.TableSnapshot;
- import org.apache.cassandra.net.AsyncOneResponse;
- import org.apache.cassandra.net.MessagingService;
- import org.apache.cassandra.streaming.*;
+ import org.apache.cassandra.streaming.StreamManager;
+ import org.apache.cassandra.streaming.StreamOperation;
+ import org.apache.cassandra.streaming.StreamPlan;
+ import org.apache.cassandra.streaming.StreamResultFuture;
+ import org.apache.cassandra.streaming.StreamState;
  import org.apache.cassandra.tracing.TraceKeyspace;
  import org.apache.cassandra.transport.ClientResourceLimits;
  import org.apache.cassandra.transport.ProtocolVersion;
- import org.apache.cassandra.utils.*;
++import org.apache.cassandra.utils.ExecutorUtils;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.JVMStabilityInspector;
+ import org.apache.cassandra.utils.MBeanWrapper;
+ import org.apache.cassandra.utils.MD5Digest;
+ import org.apache.cassandra.utils.OutputHandler;
+ import org.apache.cassandra.utils.Pair;
+ import org.apache.cassandra.utils.Throwables;
 -import org.apache.cassandra.utils.WindowsTimer;
+ import org.apache.cassandra.utils.WrappedRunnable;
++import org.apache.cassandra.utils.concurrent.Future;
++import org.apache.cassandra.utils.concurrent.FutureCombiner;
++import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
  import org.apache.cassandra.utils.logging.LoggingSupportFactory;
  import org.apache.cassandra.utils.progress.ProgressEvent;
  import org.apache.cassandra.utils.progress.ProgressEventType;
@@@ -145,10 -205,9 +224,10 @@@ import static com.google.common.collect
  import static com.google.common.collect.Iterables.tryFind;
  import static java.util.Arrays.asList;
  import static java.util.Arrays.stream;
- import static java.util.concurrent.TimeUnit.MINUTES;
  import static java.util.concurrent.TimeUnit.MILLISECONDS;
+ import static java.util.concurrent.TimeUnit.MINUTES;
  import static java.util.concurrent.TimeUnit.NANOSECONDS;
 +import static java.util.concurrent.TimeUnit.SECONDS;
  import static java.util.stream.Collectors.toList;
  import static java.util.stream.Collectors.toMap;
  import static 
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
@@@ -159,11 -218,8 +238,12 @@@ import static org.apache.cassandra.inde
  import static 
org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily;
  import static org.apache.cassandra.net.NoPayload.noPayload;
  import static org.apache.cassandra.net.Verb.REPLICATION_DONE_REQ;
- import static org.apache.cassandra.service.ActiveRepairService.*;
 -import static 
org.apache.cassandra.schema.MigrationManager.evolveSystemKeyspace;
++import static 
org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
++import static 
org.apache.cassandra.service.ActiveRepairService.repairCommandExecutor;
 +import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 +import static org.apache.cassandra.utils.Clock.Global.nanoTime;
- import static org.apache.cassandra.utils.FBUtilities.now;
  import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
++import static org.apache.cassandra.utils.FBUtilities.now;
  
  /**
   * This abstraction contains the token/identifier of this node
@@@ -2386,7 -2309,15 +2467,15 @@@ public class StorageService extends Not
  
      public UUID getLocalHostUUID()
      {
-         return 
getTokenMetadata().getHostId(FBUtilities.getBroadcastAddressAndPort());
+         UUID id = 
getTokenMetadata().getHostId(FBUtilities.getBroadcastAddressAndPort());
+         if (id != null)
+             return id;
+         // this condition is to prevent accessing the tables when the node is 
not started yet, and in particular,
+         // when it is not going to be started at all (e.g. when running some 
unit tests or client tools).
 -        else if (CommitLog.instance.isStarted())
++        else if ((DatabaseDescriptor.isDaemonInitialized() || 
DatabaseDescriptor.isToolInitialized()) && CommitLog.instance.isStarted())
+             return SystemKeyspace.getLocalHostId();
+ 
+         return null;
      }
  
      public Map<String, String> getHostIdMap()
diff --cc src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index b7164e8d7e,bad7b13150..c4bef35bfd
mode 100644,100755..100644
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@@ -17,14 -17,8 +17,7 @@@
   */
  package org.apache.cassandra.tools;
  
- import static org.apache.cassandra.tools.Util.BLUE;
- import static org.apache.cassandra.tools.Util.CYAN;
- import static org.apache.cassandra.tools.Util.RESET;
- import static org.apache.cassandra.tools.Util.WHITE;
- import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
- import static 
org.apache.commons.lang3.time.DurationFormatUtils.formatDurationWords;
- 
  import java.io.DataInputStream;
 -import java.io.File;
  import java.io.IOException;
  import java.io.PrintStream;
  import java.io.PrintWriter;
@@@ -65,15 -67,12 +67,13 @@@ import org.apache.cassandra.schema.Tabl
  import org.apache.cassandra.tools.Util.TermHistogram;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.Pair;
- import org.apache.commons.cli.CommandLine;
- import org.apache.commons.cli.CommandLineParser;
- import org.apache.commons.cli.HelpFormatter;
- import org.apache.commons.cli.Option;
- import org.apache.commons.cli.Options;
- import org.apache.commons.cli.ParseException;
- import org.apache.commons.cli.PosixParser;
  
- import com.google.common.collect.MinMaxPriorityQueue;
+ import static org.apache.cassandra.tools.Util.BLUE;
+ import static org.apache.cassandra.tools.Util.CYAN;
+ import static org.apache.cassandra.tools.Util.RESET;
+ import static org.apache.cassandra.tools.Util.WHITE;
++import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
+ import static 
org.apache.commons.lang3.time.DurationFormatUtils.formatDurationWords;
  
  /**
   * Shows the contents of sstable metadata
@@@ -372,9 -371,10 +372,10 @@@ public class SSTableMetadataViewe
                  field("maxClusteringValues", Arrays.toString(maxValues));
              }
              field("Estimated droppable tombstones",
 -                  stats.getEstimatedDroppableTombstoneRatio((int) 
(System.currentTimeMillis() / 1000) - this.gc));
 +                  stats.getEstimatedDroppableTombstoneRatio((int) 
(currentTimeMillis() / 1000) - this.gc));
              field("SSTable Level", stats.sstableLevel);
              field("Repaired at", stats.repairedAt, 
toDateString(stats.repairedAt, TimeUnit.MILLISECONDS));
+             field("Originating host id", stats.originatingHostId);
              field("Pending repair", stats.pendingRepair);
              field("Replay positions covered", stats.commitLogIntervals);
              field("totalColumnsSet", stats.totalColumnsSet);
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 44c6f8bf99,b8833dd319..d4cb1cb9ed
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -123,15 -117,9 +123,15 @@@ import org.apache.cassandra.service.Pen
  import org.apache.cassandra.service.QueryState;
  import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.service.StorageServiceMBean;
 +import org.apache.cassandra.service.paxos.PaxosRepair;
 +import org.apache.cassandra.service.paxos.PaxosState;
++import org.apache.cassandra.service.paxos.uncommitted.UncommittedTableData;
 +import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings;
 +import org.apache.cassandra.service.snapshot.SnapshotManager;
 +import org.apache.cassandra.streaming.StreamManager;
- import org.apache.cassandra.service.paxos.uncommitted.UncommittedTableData;
  import org.apache.cassandra.streaming.StreamReceiveTask;
  import org.apache.cassandra.streaming.StreamTransferTask;
 -import org.apache.cassandra.streaming.async.StreamingInboundHandler;
 +import org.apache.cassandra.streaming.async.NettyStreamingChannel;
  import org.apache.cassandra.tools.NodeTool;
  import org.apache.cassandra.tools.Output;
  import org.apache.cassandra.tools.SystemExitException;
@@@ -599,8 -512,8 +599,8 @@@ public class Instance extends IsolatedE
  
                  // We need to persist this as soon as possible after startup 
checks.
                  // This should be the first write to SystemKeyspace 
(CASSANDRA-11742)
-                 SystemKeyspace.persistLocalMetadata();
+                 SystemKeyspace.persistLocalMetadata(config::hostId);
 -                SystemKeyspaceMigrator40.migrate();
 +                SystemKeyspaceMigrator41.migrate();
  
                  // Same order to populate tokenMetadata for the first time,
                  // see org.apache.cassandra.service.CassandraDaemon.setup
@@@ -797,6 -754,9 +797,7 @@@
      @Override
      public Future<Void> shutdown(boolean graceful)
      {
 -        if (!graceful)
 -            MessagingService.instance().shutdown(1L, MINUTES, false, true);
 -
++        inInstancelogger.info("Shutting down instance {} / {}", config.num(), 
config.broadcastAddress().getHostString());
          Future<?> future = async((ExecutorService executor) -> {
              Throwable error = null;
  
@@@ -810,21 -770,12 +811,26 @@@
              }
  
              error = parallelRun(error, executor, 
StorageService.instance::disableAutoCompaction);
+             while (CompactionManager.instance.hasOngoingOrPendingTasks() && 
!Thread.currentThread().isInterrupted())
+             {
+                 inInstancelogger.info("Waiting for compactions to finish");
+                 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+             }
  
 +            // trigger init early or else it could try to init and touch a 
thread pool that got shutdown
 +            HintsService hints = HintsService.instance;
 +            ThrowingRunnable shutdownHints = () -> {
 +                // this is to allow shutdown in the case hints were halted 
already
 +                try
 +                {
 +                    HintsService.instance.shutdownBlocking();
 +                }
 +                catch (IllegalStateException e)
 +                {
 +                    if (!"HintsService has already been shut 
down".equals(e.getMessage()))
 +                        throw e;
 +                }
 +            };
              error = parallelRun(error, executor,
                                  () -> 
Gossiper.instance.stopShutdownAndWait(1L, MINUTES),
                                  CompactionManager.instance::forceShutdown,
diff --cc 
test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index a27daac0e6,037c221a37..92c56d6267
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@@ -28,10 -27,11 +28,9 @@@ import java.util.Map
  import java.util.TreeMap;
  import java.util.UUID;
  import java.util.function.Function;
 -
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
 +import java.util.stream.Collectors;
  
  import com.vdurmont.semver4j.Semver;
- 
  import org.apache.cassandra.distributed.api.Feature;
  import org.apache.cassandra.distributed.api.IInstanceConfig;
  import org.apache.cassandra.distributed.shared.NetworkTopology;
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/SSTableIdGenerationTest.java
index 8d05f3dc71,0000000000..dd37bd3d60
mode 100644,000000..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/SSTableIdGenerationTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/SSTableIdGenerationTest.java
@@@ -1,531 -1,0 +1,531 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.io.IOException;
 +import java.nio.file.Files;
 +import java.util.Arrays;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.function.Function;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.collect.ImmutableSet;
 +import org.apache.commons.io.FileUtils;
 +import org.junit.AfterClass;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.cql3.UntypedResultSet;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 +import org.apache.cassandra.db.compaction.DateTieredCompactionStrategy;
 +import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
 +import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
 +import org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy;
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.api.Feature;
 +import org.apache.cassandra.distributed.api.IInstance;
 +import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.distributed.api.SimpleQueryResult;
 +import org.apache.cassandra.distributed.shared.ClusterUtils;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.SequenceBasedSSTableId;
 +import org.apache.cassandra.io.sstable.UUIDBasedSSTableId;
 +import org.apache.cassandra.io.util.File;
 +import org.apache.cassandra.metrics.RestorableMeter;
 +import org.apache.cassandra.tools.SystemExitException;
 +import org.apache.cassandra.utils.TimeUUID;
 +import org.assertj.core.api.Assertions;
 +import org.assertj.core.data.Offset;
 +
 +import static java.lang.String.format;
 +import static org.apache.cassandra.Util.bulkLoadSSTables;
 +import static org.apache.cassandra.Util.getBackups;
 +import static org.apache.cassandra.Util.getSSTables;
 +import static org.apache.cassandra.Util.getSnapshots;
 +import static org.apache.cassandra.Util.relativizePath;
 +import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 +import static org.apache.cassandra.db.SystemKeyspace.LEGACY_SSTABLE_ACTIVITY;
 +import static org.apache.cassandra.db.SystemKeyspace.SSTABLE_ACTIVITY_V2;
 +import static org.apache.cassandra.distributed.shared.FutureUtils.waitOn;
 +import static org.apache.cassandra.distributed.test.ExecUtil.rethrow;
 +import static org.assertj.core.api.Assertions.assertThat;
 +
 +public class SSTableIdGenerationTest extends TestBaseImpl
 +{
 +    private final static String ENABLE_UUID_FIELD_NAME = 
"uuid_sstable_identifiers_enabled";
 +    private final static String SNAPSHOT_TAG = "test";
 +
 +    private int v;
 +
 +    private static SecurityManager originalSecurityManager;
 +
 +    @BeforeClass
 +    public static void beforeClass() throws Throwable
 +    {
 +        TestBaseImpl.beforeClass();
 +
 +        originalSecurityManager = System.getSecurityManager();
 +        // we prevent system exit and convert it to exception becuase this is 
one of the expected test outcomes,
 +        // and we want to make an assertion on that
 +        ClusterUtils.preventSystemExit();
 +    }
 +
 +    @AfterClass
 +    public static void afterClass() throws Throwable
 +    {
 +        System.setSecurityManager(originalSecurityManager);
 +    }
 +
 +    /**
 +     * This test verifies that a node with uuid disabled actually creates 
sstables with sequential ids and
 +     * both the current and legacy sstable activity tables are updated.
 +     * Then, when enable uuid, we actually create sstables with uuid but keep 
and can read the old sstables. Also, only
 +     * update the current sstable activity table.
 +     */
 +    @Test
 +    public void testRestartWithUUIDEnabled() throws IOException
 +    {
 +        try (Cluster cluster = init(Cluster.build(1)
 +                                           .withDataDirCount(1)
 +                                           .withConfig(config -> 
config.set(ENABLE_UUID_FIELD_NAME, false))
 +                                           .start()))
 +        {
 +            cluster.schemaChange(createTableStmt(KEYSPACE, "tbl", null));
 +            createSSTables(cluster.get(1), KEYSPACE, "tbl", 1, 2);
 +            assertSSTablesCount(cluster.get(1), 2, 0, KEYSPACE, "tbl");
 +            verfiySSTableActivity(cluster, true);
 +
 +            restartNode(cluster, 1, true);
 +
 +            createSSTables(cluster.get(1), KEYSPACE, "tbl", 3, 4);
-             assertSSTablesCount(cluster.get(1), 2, 3, KEYSPACE, "tbl");
++            assertSSTablesCount(cluster.get(1), 2, 2, KEYSPACE, "tbl");
 +            verfiySSTableActivity(cluster, false);
 +
 +            checkRowsNumber(cluster.get(1), KEYSPACE, "tbl", 9);
 +        }
 +    }
 +
 +    /**
 +     * This test verifies that we should not be able to start a node with 
uuid disabled when there are uuid sstables
 +     */
 +    @Test
 +    public void testRestartWithUUIDDisabled() throws IOException
 +    {
 +        try (Cluster cluster = init(Cluster.build(1)
 +                                           .withDataDirCount(1)
 +                                           .withConfig(config -> 
config.set(ENABLE_UUID_FIELD_NAME, true))
 +                                           .start()))
 +        {
 +            cluster.disableAutoCompaction(KEYSPACE);
 +            cluster.schemaChange(createTableStmt(KEYSPACE, "tbl", null));
 +            createSSTables(cluster.get(1), KEYSPACE, "tbl", 1, 2);
 +            assertSSTablesCount(cluster.get(1), 0, 2, KEYSPACE, "tbl");
 +            verfiySSTableActivity(cluster, false);
 +
 +            Assertions.assertThatExceptionOfType(RuntimeException.class)
 +                      .isThrownBy(() -> restartNode(cluster, 1, false))
 +                      .withCauseInstanceOf(SystemExitException.class);
 +        }
 +    }
 +
 +    @Test
 +    public final void testCompactionStrategiesWithMixedSSTables() throws 
Exception
 +    {
 +        
testCompactionStrategiesWithMixedSSTables(SizeTieredCompactionStrategy.class,
 +                                                  
DateTieredCompactionStrategy.class,
 +                                                  
TimeWindowCompactionStrategy.class,
 +                                                  
LeveledCompactionStrategy.class);
 +    }
 +
 +    /**
 +     * The purpose of this test is to verify that we can compact using the 
given strategy the mix of sstables created
 +     * with sequential id and with uuid. Then we verify whether the number 
results matches the number of rows which we
 +     * would get by merging data from the initial sstables.
 +     */
 +    @SafeVarargs
 +    private final void testCompactionStrategiesWithMixedSSTables(final 
Class<? extends AbstractCompactionStrategy>... compactionStrategyClasses) 
throws Exception
 +    {
 +        try (Cluster cluster = init(Cluster.build(1)
 +                                           .withDataDirCount(1)
 +                                           .withConfig(config -> 
config.set(ENABLE_UUID_FIELD_NAME, false))
 +                                           .start()))
 +        {
 +            // create a table and two sstables with sequential id for each 
strategy, the sstables will contain overlapping partitions
 +            for (Class<? extends AbstractCompactionStrategy> 
compactionStrategyClass : compactionStrategyClasses)
 +            {
 +                String tableName = "tbl_" + 
compactionStrategyClass.getSimpleName().toLowerCase();
 +                cluster.schemaChange(createTableStmt(KEYSPACE, tableName, 
compactionStrategyClass));
 +
 +                createSSTables(cluster.get(1), KEYSPACE, tableName, 1, 2);
 +                assertSSTablesCount(cluster.get(1), 2, 0, KEYSPACE, 
tableName);
 +            }
 +
 +            // restart the node with uuid enabled
 +            restartNode(cluster, 1, true);
 +
 +            // create another two sstables with uuid for each previously 
created table
 +            for (Class<? extends AbstractCompactionStrategy> 
compactionStrategyClass : compactionStrategyClasses)
 +            {
 +                String tableName = "tbl_" + 
compactionStrategyClass.getSimpleName().toLowerCase();
 +
 +                createSSTables(cluster.get(1), KEYSPACE, tableName, 3, 4);
 +
 +                // expect to have a mix of sstables with sequential id and 
uuid
-                 assertSSTablesCount(cluster.get(1), 2, 3, KEYSPACE, 
tableName);
++                assertSSTablesCount(cluster.get(1), 2, 2, KEYSPACE, 
tableName);
 +
 +                // after compaction, we expect to have a single sstable with 
uuid
 +                cluster.get(1).forceCompact(KEYSPACE, tableName);
 +                assertSSTablesCount(cluster.get(1), 0, 1, KEYSPACE, 
tableName);
 +
 +                // verify the number of rows
 +                checkRowsNumber(cluster.get(1), KEYSPACE, tableName, 9);
 +            }
 +        }
 +    }
 +
 +    @Test
 +    public void testStreamingToNodeWithUUIDEnabled() throws Exception
 +    {
 +        testStreaming(true);
 +    }
 +
 +    @Test
 +    public void testStreamingToNodeWithUUIDDisabled() throws Exception
 +    {
 +        testStreaming(false);
 +    }
 +
 +    /**
 +     * The purpose of this test case is to verify the scenario when we need 
to stream mixed UUID and seq sstables to
 +     * a node which have: 1) UUID disabled, and 2) UUID enabled; then verify 
that we can read all the data properly
 +     * from that node alone.
 +     */
 +    private void testStreaming(boolean uuidEnabledOnTargetNode) throws 
Exception
 +    {
 +        // start both nodes with uuid disabled
 +        try (Cluster cluster = init(Cluster.build(2)
 +                                           .withDataDirCount(1)
 +                                           .withConfig(config -> 
config.set(ENABLE_UUID_FIELD_NAME, false).with(Feature.NETWORK))
 +                                           .start()))
 +        {
 +            // create an empty table and shutdown nodes 2, 3
 +            cluster.schemaChange(createTableStmt(KEYSPACE, "tbl", null));
 +            waitOn(cluster.get(2).shutdown());
 +
 +            // create 2 sstables with overlapping partitions on node 1 (with 
seq ids)
 +            createSSTables(cluster.get(1), KEYSPACE, "tbl", 1, 2);
 +
 +            // restart node 1 with uuid enabled
 +            restartNode(cluster, 1, true);
 +
 +            // create 2 sstables with overlapping partitions on node 1 (with 
UUID ids)
 +            createSSTables(cluster.get(1), KEYSPACE, "tbl", 3, 4);
 +
-             assertSSTablesCount(cluster.get(1), 2, 3, KEYSPACE, "tbl");
++            assertSSTablesCount(cluster.get(1), 2, 2, KEYSPACE, "tbl");
 +
 +            // now start node with UUID disabled and perform repair
 +            cluster.get(2).config().set(ENABLE_UUID_FIELD_NAME, 
uuidEnabledOnTargetNode);
 +            cluster.get(2).startup();
 +
 +            assertSSTablesCount(cluster.get(2), 0, 0, KEYSPACE, "tbl");
 +
 +            // at this point we have sstables with seq and uuid on nodes and 
no sstables on node
-             // when we run repair, we expect streaming all 5 sstables from 
node 1 to node 2
++            // when we run repair, we expect streaming all 4 sstables from 
node 1 to node 2
 +
 +            cluster.get(2).nodetool("repair", KEYSPACE);
 +
 +            if (uuidEnabledOnTargetNode)
-                 assertSSTablesCount(cluster.get(2), 0, 5, KEYSPACE, "tbl");
++                assertSSTablesCount(cluster.get(2), 0, 4, KEYSPACE, "tbl");
 +            else
-                 assertSSTablesCount(cluster.get(2), 5, 0, KEYSPACE, "tbl");
++                assertSSTablesCount(cluster.get(2), 4, 0, KEYSPACE, "tbl");
 +
 +            waitOn(cluster.get(1).shutdown());
 +
 +            checkRowsNumber(cluster.get(2), KEYSPACE, "tbl", 9);
 +        }
 +    }
 +
 +    @Test
 +    public void testSnapshot() throws Exception
 +    {
 +        File tmpDir = new File(Files.createTempDirectory("test"));
 +        Set<String> seqOnlyBackupDirs;
 +        Set<String> seqAndUUIDBackupDirs;
 +        Set<String> uuidOnlyBackupDirs;
 +        try (Cluster cluster = init(Cluster.build(1)
 +                                           .withDataDirCount(1)
 +                                           .withConfig(config -> 
config.with(Feature.NETWORK)
 +                                                                       
.set("incremental_backups", true)
 +                                                                       
.set("snapshot_before_compaction", false)
 +                                                                       
.set("auto_snapshot", false)
 +                                                                       
.set(ENABLE_UUID_FIELD_NAME, false))
 +                                           .start()))
 +        {
 +            // create the tables
 +
 +            cluster.schemaChange("CREATE KEYSPACE new_ks WITH replication = 
{'class': 'SimpleStrategy', 'replication_factor': 1};");
 +
 +            cluster.schemaChange(createTableStmt(KEYSPACE, "tbl_seq_only", 
null));
 +            cluster.schemaChange(createTableStmt(KEYSPACE, 
"tbl_seq_and_uuid", null));
 +            cluster.schemaChange(createTableStmt(KEYSPACE, "tbl_uuid_only", 
null));
 +            cluster.schemaChange(createTableStmt("new_ks", "tbl_seq_only", 
null));
 +            cluster.schemaChange(createTableStmt("new_ks", 
"tbl_seq_and_uuid", null));
 +            cluster.schemaChange(createTableStmt("new_ks", "tbl_uuid_only", 
null));
 +
 +            // creating sstables
 +            createSSTables(cluster.get(1), KEYSPACE, "tbl_seq_only", 1, 2, 3, 
4);
 +            createSSTables(cluster.get(1), KEYSPACE, "tbl_seq_and_uuid", 1, 
2);
 +            createSSTables(cluster.get(1), "new_ks", "tbl_seq_only", 5, 6, 7, 
8);
 +            createSSTables(cluster.get(1), "new_ks", "tbl_seq_and_uuid", 5, 
6);
 +
 +            restartNode(cluster, 1, true);
 +
 +            createSSTables(cluster.get(1), KEYSPACE, "tbl_seq_and_uuid", 3, 
4);
 +            createSSTables(cluster.get(1), KEYSPACE, "tbl_uuid_only", 1, 2, 
3, 4);
 +            createSSTables(cluster.get(1), "new_ks", "tbl_seq_and_uuid", 7, 
8);
 +            createSSTables(cluster.get(1), "new_ks", "tbl_uuid_only", 5, 6, 
7, 8);
 +
 +            Set<String> seqOnlySnapshotDirs = snapshot(cluster.get(1), 
KEYSPACE, "tbl_seq_only");
 +            Set<String> seqAndUUIDSnapshotDirs = snapshot(cluster.get(1), 
KEYSPACE, "tbl_seq_and_uuid");
 +            Set<String> uuidOnlySnapshotDirs = snapshot(cluster.get(1), 
KEYSPACE, "tbl_uuid_only");
 +
 +            seqOnlyBackupDirs = getBackupDirs(cluster.get(1), KEYSPACE, 
"tbl_seq_only");
 +            seqAndUUIDBackupDirs = getBackupDirs(cluster.get(1), KEYSPACE, 
"tbl_seq_and_uuid");
 +            uuidOnlyBackupDirs = getBackupDirs(cluster.get(1), KEYSPACE, 
"tbl_uuid_only");
 +
 +            // at this point, we should have sstables with backups and 
snapshots for all tables
-             assertSSTablesCount(cluster.get(1), 4, 1, KEYSPACE, 
"tbl_seq_only");
-             assertSSTablesCount(cluster.get(1), 2, 3, KEYSPACE, 
"tbl_seq_and_uuid");
++            assertSSTablesCount(cluster.get(1), 4, 0, KEYSPACE, 
"tbl_seq_only");
++            assertSSTablesCount(cluster.get(1), 2, 2, KEYSPACE, 
"tbl_seq_and_uuid");
 +            assertSSTablesCount(cluster.get(1), 0, 4, KEYSPACE, 
"tbl_uuid_only");
 +
-             assertBackupSSTablesCount(cluster.get(1), 4, 1, KEYSPACE, 
"tbl_seq_only");
-             assertBackupSSTablesCount(cluster.get(1), 2, 3, KEYSPACE, 
"tbl_seq_and_uuid");
++            assertBackupSSTablesCount(cluster.get(1), 4, 0, KEYSPACE, 
"tbl_seq_only");
++            assertBackupSSTablesCount(cluster.get(1), 2, 2, KEYSPACE, 
"tbl_seq_and_uuid");
 +            assertBackupSSTablesCount(cluster.get(1), 0, 4, KEYSPACE, 
"tbl_uuid_only");
 +
-             assertSnapshotSSTablesCount(cluster.get(1), 4, 1, KEYSPACE, 
"tbl_seq_only");
-             assertSnapshotSSTablesCount(cluster.get(1), 2, 3, KEYSPACE, 
"tbl_seq_and_uuid");
++            assertSnapshotSSTablesCount(cluster.get(1), 4, 0, KEYSPACE, 
"tbl_seq_only");
++            assertSnapshotSSTablesCount(cluster.get(1), 2, 2, KEYSPACE, 
"tbl_seq_and_uuid");
 +            assertSnapshotSSTablesCount(cluster.get(1), 0, 4, KEYSPACE, 
"tbl_uuid_only");
 +
 +            checkRowsNumber(cluster.get(1), KEYSPACE, "tbl_seq_only", 9);
 +            checkRowsNumber(cluster.get(1), KEYSPACE, "tbl_seq_and_uuid", 9);
 +            checkRowsNumber(cluster.get(1), KEYSPACE, "tbl_uuid_only", 9);
 +
 +            // truncate the first set of tables
 +            truncateAndAssertEmpty(cluster.get(1), KEYSPACE, "tbl_seq_only", 
"tbl_seq_and_uuid", "tbl_uuid_only");
 +
 +            restore(cluster.get(1), seqOnlySnapshotDirs, "tbl_seq_only", 9);
 +            restore(cluster.get(1), seqAndUUIDSnapshotDirs, 
"tbl_seq_and_uuid", 9);
 +            restore(cluster.get(1), uuidOnlySnapshotDirs, "tbl_uuid_only", 9);
 +
 +            truncateAndAssertEmpty(cluster.get(1), KEYSPACE, "tbl_seq_only", 
"tbl_seq_and_uuid", "tbl_uuid_only");
 +
 +            restore(cluster.get(1), seqOnlyBackupDirs, "tbl_seq_only", 9);
 +            restore(cluster.get(1), seqAndUUIDBackupDirs, "tbl_seq_and_uuid", 
9);
 +            restore(cluster.get(1), uuidOnlyBackupDirs, "tbl_uuid_only", 9);
 +
 +            ImmutableSet<String> allBackupDirs = 
ImmutableSet.<String>builder().addAll(seqOnlyBackupDirs).addAll(seqAndUUIDBackupDirs).addAll(uuidOnlyBackupDirs).build();
 +            cluster.get(1).runOnInstance(rethrow(() -> 
allBackupDirs.forEach(dir -> bulkLoadSSTables(new File(dir), "new_ks"))));
 +
 +            checkRowsNumber(cluster.get(1), "new_ks", "tbl_seq_only", 17);
 +            checkRowsNumber(cluster.get(1), "new_ks", "tbl_seq_and_uuid", 17);
 +            checkRowsNumber(cluster.get(1), "new_ks", "tbl_uuid_only", 17);
 +
 +
 +            for (String dir : allBackupDirs)
 +            {
 +                File src = new File(dir);
 +                File dest = relativizePath(tmpDir, src, 3);
 +                Files.createDirectories(dest.parent().toPath());
 +                FileUtils.moveDirectory(src.toJavaIOFile(), 
dest.toJavaIOFile());
 +            }
 +        }
 +
 +        try (Cluster cluster = init(Cluster.build(1)
 +                                           .withDataDirCount(1)
 +                                           .withConfig(config -> 
config.with(Feature.NETWORK, Feature.NATIVE_PROTOCOL)
 +                                                                       
.set("incremental_backups", true)
 +                                                                       
.set("snapshot_before_compaction", false)
 +                                                                       
.set("auto_snapshot", false)
 +                                                                       
.set(ENABLE_UUID_FIELD_NAME, false))
 +                                           .start()))
 +        {
 +            cluster.schemaChange(createTableStmt(KEYSPACE, "tbl_seq_only", 
null));
 +            cluster.schemaChange(createTableStmt(KEYSPACE, 
"tbl_seq_and_uuid", null));
 +            cluster.schemaChange(createTableStmt(KEYSPACE, "tbl_uuid_only", 
null));
 +
 +            Function<String, String> relativeToTmpDir = d -> 
relativizePath(tmpDir, new File(d), 3).toString();
 +            restore(cluster.get(1), 
seqOnlyBackupDirs.stream().map(relativeToTmpDir).collect(Collectors.toSet()), 
"tbl_seq_only", 9);
 +            restore(cluster.get(1), 
seqAndUUIDBackupDirs.stream().map(relativeToTmpDir).collect(Collectors.toSet()),
 "tbl_seq_and_uuid", 9);
 +            restore(cluster.get(1), 
uuidOnlyBackupDirs.stream().map(relativeToTmpDir).collect(Collectors.toSet()), 
"tbl_uuid_only", 9);
 +        }
 +    }
 +
 +    private static void restore(IInvokableInstance instance, Set<String> 
dirs, String targetTableName, int expectedRowsNum)
 +    {
 +        List<String> failedImports = instance.callOnInstance(() -> 
ColumnFamilyStore.getIfExists(KEYSPACE, targetTableName)
 +                                                                              
      .importNewSSTables(dirs, false, false, true, true, true, true, true));
 +        assertThat(failedImports).isEmpty();
 +        checkRowsNumber(instance, KEYSPACE, targetTableName, expectedRowsNum);
 +    }
 +
 +    private static void truncateAndAssertEmpty(IInvokableInstance instance, 
String ks, String... tableNames)
 +    {
 +        for (String tableName : tableNames)
 +        {
 +            instance.executeInternal(format("TRUNCATE %s.%s", ks, tableName));
 +            assertSSTablesCount(instance, 0, 0, ks, tableName);
 +            checkRowsNumber(instance, ks, tableName, 0);
 +        }
 +    }
 +
 +    private static Set<String> snapshot(IInvokableInstance instance, String 
ks, String tableName)
 +    {
 +        Set<String> snapshotDirs = instance.callOnInstance(() -> 
ColumnFamilyStore.getIfExists(ks, tableName)
 +                                                                              
    .snapshot(SNAPSHOT_TAG)
 +                                                                              
    .getDirectories()
 +                                                                              
    .stream()
 +                                                                              
    .map(File::toString)
 +                                                                              
    .collect(Collectors.toSet()));
 +        assertThat(snapshotDirs).isNotEmpty();
 +        return snapshotDirs;
 +    }
 +
 +    private static String createTableStmt(String ks, String name, Class<? 
extends AbstractCompactionStrategy> compactionStrategy)
 +    {
 +        if (compactionStrategy == null)
 +            compactionStrategy = SizeTieredCompactionStrategy.class;
 +        return format("CREATE TABLE %s.%s (pk int, ck int, v int, PRIMARY KEY 
(pk, ck)) " +
 +                      "WITH compaction = {'class':'%s', 'enabled':'false'}",
 +                      ks, name, compactionStrategy.getCanonicalName());
 +    }
 +
 +    private void createSSTables(IInstance instance, String ks, String 
tableName, int... records)
 +    {
 +        String insert = format("INSERT INTO %s.%s (pk, ck, v) VALUES (?, ?, 
?)", ks, tableName);
 +        for (int record : records)
 +        {
 +            instance.executeInternal(insert, record, record, ++v);
 +            instance.executeInternal(insert, record, record + 1, ++v);
 +            instance.executeInternal(insert, record + 1, record + 1, ++v);
 +            instance.flush(ks);
 +        }
 +    }
 +
 +    private static void assertSSTablesCount(Set<Descriptor> descs, String 
tableName, int expectedSeqGenIds, int expectedUUIDGenIds)
 +    {
 +        List<String> seqSSTables = descs.stream().filter(desc -> desc.id 
instanceof 
SequenceBasedSSTableId).map(Descriptor::baseFilename).sorted().collect(Collectors.toList());
 +        List<String> uuidSSTables = descs.stream().filter(desc -> desc.id 
instanceof 
UUIDBasedSSTableId).map(Descriptor::baseFilename).sorted().collect(Collectors.toList());
 +        assertThat(seqSSTables).describedAs("SSTables of %s with sequence 
based id", tableName).hasSize(expectedSeqGenIds);
 +        assertThat(uuidSSTables).describedAs("SSTables of %s with UUID based 
id", tableName).hasSize(expectedUUIDGenIds);
 +    }
 +
 +    private static void assertSSTablesCount(IInvokableInstance instance, int 
expectedSeqGenIds, int expectedUUIDGenIds, String ks, String... tableNames)
 +    {
 +        instance.runOnInstance(rethrow(() -> 
Arrays.stream(tableNames).forEach(tableName -> 
assertSSTablesCount(getSSTables(ks, tableName), tableName, expectedSeqGenIds, 
expectedUUIDGenIds))));
 +    }
 +
 +    private static void assertSnapshotSSTablesCount(IInvokableInstance 
instance, int expectedSeqGenIds, int expectedUUIDGenIds, String ks, String... 
tableNames)
 +    {
 +        instance.runOnInstance(rethrow(() -> 
Arrays.stream(tableNames).forEach(tableName -> 
assertSSTablesCount(getSnapshots(ks, tableName, SNAPSHOT_TAG), tableName, 
expectedSeqGenIds, expectedUUIDGenIds))));
 +    }
 +
 +    private static void assertBackupSSTablesCount(IInvokableInstance 
instance, int expectedSeqGenIds, int expectedUUIDGenIds, String ks, String... 
tableNames)
 +    {
 +        instance.runOnInstance(rethrow(() -> 
Arrays.stream(tableNames).forEach(tableName -> 
assertSSTablesCount(getBackups(ks, tableName), tableName, expectedSeqGenIds, 
expectedUUIDGenIds))));
 +    }
 +
 +    private static Set<String> getBackupDirs(IInvokableInstance instance, 
String ks, String tableName)
 +    {
 +        return instance.callOnInstance(() -> getBackups(ks, 
tableName).stream()
 +                                                                      .map(d 
-> d.directory)
 +                                                                      
.map(File::toString)
 +                                                                      
.collect(Collectors.toSet()));
 +    }
 +
 +    private static void verfiySSTableActivity(Cluster cluster, boolean 
expectLegacyTableIsPopulated)
 +    {
 +        cluster.get(1).runOnInstance(() -> {
 +            RestorableMeter meter = new RestorableMeter(15, 120);
 +            SequenceBasedSSTableId seqGenId = new SequenceBasedSSTableId(1);
 +            SystemKeyspace.persistSSTableReadMeter("ks", "tab", seqGenId, 
meter);
 +            assertThat(SystemKeyspace.getSSTableReadMeter("ks", "tab", 
seqGenId)).matches(m -> m.fifteenMinuteRate() == meter.fifteenMinuteRate()
 +                                                                              
                 && m.twoHourRate() == meter.twoHourRate());
 +
 +            checkSSTableActivityRow(SSTABLE_ACTIVITY_V2, seqGenId.toString(), 
true);
 +            if (expectLegacyTableIsPopulated)
 +                checkSSTableActivityRow(LEGACY_SSTABLE_ACTIVITY, 
seqGenId.generation, true);
 +
 +            SystemKeyspace.clearSSTableReadMeter("ks", "tab", seqGenId);
 +
 +            checkSSTableActivityRow(SSTABLE_ACTIVITY_V2, seqGenId.toString(), 
false);
 +            if (expectLegacyTableIsPopulated)
 +                checkSSTableActivityRow(LEGACY_SSTABLE_ACTIVITY, 
seqGenId.generation, false);
 +
 +            UUIDBasedSSTableId uuidGenId = new 
UUIDBasedSSTableId(TimeUUID.Generator.nextTimeUUID());
 +            SystemKeyspace.persistSSTableReadMeter("ks", "tab", uuidGenId, 
meter);
 +            assertThat(SystemKeyspace.getSSTableReadMeter("ks", "tab", 
uuidGenId)).matches(m -> m.fifteenMinuteRate() == meter.fifteenMinuteRate()
 +                                                                              
                  && m.twoHourRate() == meter.twoHourRate());
 +
 +            checkSSTableActivityRow(SSTABLE_ACTIVITY_V2, 
uuidGenId.toString(), true);
 +
 +            SystemKeyspace.clearSSTableReadMeter("ks", "tab", uuidGenId);
 +
 +            checkSSTableActivityRow(SSTABLE_ACTIVITY_V2, 
uuidGenId.toString(), false);
 +        });
 +    }
 +
 +    private static void checkSSTableActivityRow(String table, Object genId, 
boolean expectExists)
 +    {
 +        String tableColName = SSTABLE_ACTIVITY_V2.equals(table) ? 
"table_name" : "columnfamily_name";
 +        String idColName = SSTABLE_ACTIVITY_V2.equals(table) ? "id" : 
"generation";
 +        String cql = "SELECT rate_15m, rate_120m FROM system.%s WHERE 
keyspace_name=? and %s=? and %s=?";
 +        UntypedResultSet results = executeInternal(format(cql, table, 
tableColName, idColName), "ks", "tab", genId);
 +        assertThat(results).isNotNull();
 +
 +        if (expectExists)
 +        {
 +            assertThat(results.isEmpty()).isFalse();
 +            UntypedResultSet.Row row = results.one();
 +            assertThat(row.getDouble("rate_15m")).isEqualTo(15d, 
Offset.offset(0.001d));
 +            assertThat(row.getDouble("rate_120m")).isEqualTo(120d, 
Offset.offset(0.001d));
 +        }
 +        else
 +        {
 +            assertThat(results.isEmpty()).isTrue();
 +        }
 +    }
 +
 +    private static void restartNode(Cluster cluster, int node, boolean 
uuidEnabled)
 +    {
 +        waitOn(cluster.get(node).shutdown());
 +        cluster.get(node).config().set(ENABLE_UUID_FIELD_NAME, uuidEnabled);
 +        cluster.get(node).startup();
 +    }
 +
 +    private static void checkRowsNumber(IInstance instance, String ks, String 
tableName, int expectedNumber)
 +    {
 +        SimpleQueryResult result = 
instance.executeInternalWithResult(format("SELECT * FROM %s.%s", ks, 
tableName));
 +        Object[][] rows = result.toObjectArrays();
 +        assertThat(rows).withFailMessage("Invalid results for %s.%s - should 
have %d rows but has %d: \n%s", ks, tableName, expectedNumber,
 +                                         rows.length, 
result.toString()).hasSize(expectedNumber);
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/db/KeyCacheTest.java
index c673458d86,445fc67c00..475f1a162f
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@@ -350,10 -348,9 +350,9 @@@ public class KeyCacheTes
  
          CacheService.instance.keyCache.loadSaved();
  
 -        // Here max time to load cache is negative which means no time left 
to load cache. So the keyCache size should
 +        // Here max time to load cache is zero which means no time left to 
load cache. So the keyCache size should
          // be zero after loadSaved().
          assertKeyCacheSize(0, KEYSPACE1, cf);
-         assertEquals(0, CacheService.instance.keyCache.size());
      }
  
      @Test
diff --cc test/unit/org/apache/cassandra/db/compaction/NeverPurgeTest.java
index 089ede818e,a5388ca037..42537319ff
--- a/test/unit/org/apache/cassandra/db/compaction/NeverPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/NeverPurgeTest.java
@@@ -20,9 -20,9 +20,10 @@@ package org.apache.cassandra.db.compact
  
  import java.util.Collection;
  
+ import org.junit.BeforeClass;
  import org.junit.Test;
  
 +import org.apache.cassandra.Util;
  import org.apache.cassandra.cql3.CQLTester;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Keyspace;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to