This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 6b3958f1d83777e295690a663610b6b29ed1efae
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Fri Nov 17 17:13:30 2023 +0100

    Improve setup and initialisation of LocalLog/LogSpec
    
    Patch by Alex Petrov; reviewed by Sam Tunnicliffe and marcuse for 
CASSANDRA-19271
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/schema/DistributedSchema.java |   9 +-
 .../cassandra/tcm/ClusterMetadataService.java      |  38 ++--
 src/java/org/apache/cassandra/tcm/Period.java      |   5 +-
 src/java/org/apache/cassandra/tcm/Startup.java     |  52 ++---
 .../cassandra/tcm/StubClusterMetadataService.java  |   8 +-
 .../cassandra/tcm/listeners/SchemaListener.java    |  10 +-
 .../org/apache/cassandra/tcm/log/LocalLog.java     | 231 ++++++++++++---------
 .../org/apache/cassandra/tcm/log/LogStorage.java   |   1 +
 .../cassandra/tcm/log/SystemKeyspaceStorage.java   |   5 +
 .../distributed/test/jmx/JMXFeatureTest.java       |   3 +-
 .../distributed/test/log/CMSTestBase.java          |   9 +-
 .../test/log/ClusterMetadataTestHelper.java        |  64 +++---
 .../test/log/CoordinatorPathTestBase.java          |  18 +-
 .../distributed/test/ring/DecommissionTest.java    |   9 +-
 .../unit/org/apache/cassandra/ServerTestUtils.java |  41 ++--
 .../apache/cassandra/hints/HintsUpgradeTest.java   |   2 +
 .../cassandra/service/ClientWarningsTest.java      |  67 +++---
 .../apache/cassandra/tcm/BootWithMetadataTest.java |   3 +-
 .../cassandra/tcm/DiscoverySimulationTest.java     |   7 +-
 .../org/apache/cassandra/tcm/LogStateTest.java     |  16 +-
 .../org/apache/cassandra/tcm/log/LocalLogTest.java |  25 ++-
 .../tcm/log/LogListenerNotificationTest.java       |   9 +-
 .../io/sstable/StressCQLSSTableWriter.java         |   3 -
 24 files changed, 370 insertions(+), 266 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 5ad47d72bd..64841aa886 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Improve setup and initialisation of LocalLog/LogSpec (CASSANDRA-19271)
  * Refactor structure of caching metrics and expose auth cache metrics via JMX 
(CASSANDRA-17062)
  * Allow CQL client certificate authentication to work without sending an 
AUTHENTICATE request (CASSANDRA-18857)
  * Extend nodetool tpstats and system_views.thread_pools with detailed pool 
parameters (CASSANDRA-19289) 
diff --git a/src/java/org/apache/cassandra/schema/DistributedSchema.java 
b/src/java/org/apache/cassandra/schema/DistributedSchema.java
index fbdf9c1c88..86dd1d5117 100644
--- a/src/java/org/apache/cassandra/schema/DistributedSchema.java
+++ b/src/java/org/apache/cassandra/schema/DistributedSchema.java
@@ -123,6 +123,10 @@ public class DistributedSchema implements 
MetadataValue<DistributedSchema>
     {
         keyspaceInstances.putAll(prev.keyspaceInstances);
 
+        // If there are keyspaces in schema, but none of them are initialised, 
we're in first boot. Initialise all.
+        if (!prev.isEmpty() && prev.keyspaceInstances.isEmpty())
+            prev = DistributedSchema.empty();
+
         Keyspaces.KeyspacesDiff ksDiff = Keyspaces.diff(prev.getKeyspaces(), 
getKeyspaces());
 
         SchemaChangeNotifier schemaChangeNotifier = 
Schema.instance.schemaChangeNotifier();
@@ -148,8 +152,8 @@ public class DistributedSchema implements 
MetadataValue<DistributedSchema>
                 assert delta.before.name.equals(delta.after.name);
 
                 // drop tables and views
-                delta.views.dropped.forEach(v -> dropView(keyspace, v, true));
-                delta.tables.dropped.forEach(t -> dropTable(keyspace, t, 
true));
+                delta.views.dropped.forEach(v -> dropView(keyspace, v, 
loadSSTables));
+                delta.tables.dropped.forEach(t -> dropTable(keyspace, t, 
loadSSTables));
 
                 // add tables and views
                 delta.tables.created.forEach(t -> createTable(keyspace, t, 
loadSSTables));
@@ -164,7 +168,6 @@ public class DistributedSchema implements 
MetadataValue<DistributedSchema>
                 
keyspace.viewManager.reload(keyspaces.get(keyspace.getName()).get());
             }
 
-            //schemaChangeNotifier.notifyKeyspaceAltered(delta);
             SchemaDiagnostics.keyspaceAltered(Schema.instance, delta);
         });
 
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
index 01abe4abdf..1077e2dded 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.ExceptionCode;
+import org.apache.cassandra.exceptions.StartupException;
 import org.apache.cassandra.io.util.FileInputStreamPlus;
 import org.apache.cassandra.io.util.FileOutputStreamPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -47,10 +48,10 @@ import org.apache.cassandra.metrics.TCMMetrics;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.schema.DistributedSchema;
 import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.listeners.SchemaListener;
 import org.apache.cassandra.tcm.log.Entry;
 import org.apache.cassandra.tcm.log.LocalLog;
 import org.apache.cassandra.tcm.log.LogState;
-import org.apache.cassandra.tcm.log.LogStorage;
 import org.apache.cassandra.tcm.log.Replication;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.membership.NodeVersion;
@@ -150,22 +151,21 @@ public class ClusterMetadataService
     ClusterMetadataService(PlacementProvider placementProvider,
                            Function<Processor, Processor> wrapProcessor,
                            Supplier<State> cmsStateSupplier,
-                           LocalLog.LogSpec logSpec)
+                           LocalLog.LogSpec logSpec) throws StartupException
     {
         this.placementProvider = placementProvider;
         this.snapshots = new 
MetadataSnapshots.SystemKeyspaceMetadataSnapshots();
 
         Processor localProcessor;
-        LogStorage logStorage = LogStorage.SystemKeyspace;
         if 
(CassandraRelevantProperties.TCM_USE_ATOMIC_LONG_PROCESSOR.getBoolean())
         {
-            log = LocalLog.sync(logSpec);
+            log = logSpec.sync().createLog();
             localProcessor = wrapProcessor.apply(new 
AtomicLongBackedProcessor(log, logSpec.isReset()));
-            fetchLogHandler = new FetchCMSLog.Handler((e, ignored) -> 
logStorage.getLogState(e));
+            fetchLogHandler = new FetchCMSLog.Handler((e, ignored) -> 
logSpec.storage().getLogState(e));
         }
         else
         {
-            log = LocalLog.async(logSpec);
+            log = logSpec.async().createLog();
             localProcessor = wrapProcessor.apply(new 
PaxosBackedProcessor(log));
             fetchLogHandler = new FetchCMSLog.Handler();
         }
@@ -243,13 +243,23 @@ public class ClusterMetadataService
     {
         if (instance != null)
             return;
-        ClusterMetadata emptyFromSystemTables = 
emptyWithSchemaFromSystemTables(Collections.singleton("DC1"));
-        
emptyFromSystemTables.schema.initializeKeyspaceInstances(DistributedSchema.empty(),
 loadSSTables);
-        emptyFromSystemTables = emptyFromSystemTables.forceEpoch(Epoch.EMPTY);
-        LocalLog.LogSpec logSpec = new 
LocalLog.LogSpec().withInitialState(emptyFromSystemTables)
-                                                         .withStorage(new 
AtomicLongBackedProcessor.InMemoryStorage());
-        LocalLog log = LocalLog.sync(logSpec);
-        log.ready();
+        ClusterMetadata emptyFromSystemTables = 
emptyWithSchemaFromSystemTables(Collections.singleton("DC1"))
+                                                .forceEpoch(Epoch.EMPTY);
+
+        LocalLog.LogSpec logSpec = LocalLog.logSpec()
+                                           
.withInitialState(emptyFromSystemTables)
+                                           .loadSSTables(loadSSTables)
+                                           .withDefaultListeners(false)
+                                           .withListener(new 
SchemaListener(loadSSTables) {
+                                               @Override
+                                               public void 
notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean 
fromSnapshot)
+                                               {
+                                                   // we do not need a 
post-commit hook for tools
+                                               }
+                                           })
+                                           .sync()
+                                           .withStorage(new 
AtomicLongBackedProcessor.InMemoryStorage());
+        LocalLog log = logSpec.createLog();
         ClusterMetadataService cms = new ClusterMetadataService(new 
UniformRangePlacement(),
                                                                 
MetadataSnapshots.NO_OP,
                                                                 log,
@@ -260,6 +270,8 @@ public class ClusterMetadataService
                                                                 null,
                                                                 null,
                                                                 new 
PeerLogFetcher(log));
+
+        log.readyUnchecked();
         log.bootstrap(FBUtilities.getBroadcastAddressAndPort());
         ClusterMetadataService.setInstance(cms);
     }
diff --git a/src/java/org/apache/cassandra/tcm/Period.java 
b/src/java/org/apache/cassandra/tcm/Period.java
index 32bf046232..e743504397 100644
--- a/src/java/org/apache/cassandra/tcm/Period.java
+++ b/src/java/org/apache/cassandra/tcm/Period.java
@@ -60,7 +60,10 @@ public class Period
      */
     public static long scanLogForPeriod(TableMetadata logTable, Epoch since)
     {
-        long currentPeriod = ClusterMetadata.current().period;
+        long currentPeriod = Period.EMPTY;
+        ClusterMetadata metadata = ClusterMetadata.currentNullable();
+        if (metadata != null)
+            currentPeriod = metadata.period;
         PeriodFinder visitor = currentPeriod > Period.FIRST
                                ? new ReversePeriodFinder(since, currentPeriod)
                                : new ForwardPeriodFinder(since);
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java 
b/src/java/org/apache/cassandra/tcm/Startup.java
index c536bd3361..fcc2ff5fa7 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -47,7 +47,6 @@ import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.NewGossiper;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.DistributedSchema;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableMetadata;
@@ -141,18 +140,16 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
 
     public static void initializeAsNonCmsNode(Function<Processor, Processor> 
wrapProcessor) throws StartupException
     {
-        LocalLog.LogSpec logSpec = new 
LocalLog.LogSpec().withStorage(LogStorage.SystemKeyspace)
-                                                         
.withDefaultListeners();
+        LocalLog.LogSpec logSpec = LocalLog.logSpec()
+                                           
.withStorage(LogStorage.SystemKeyspace)
+                                           
.afterReplay(Startup::scrubDataDirectories)
+                                           .withDefaultListeners();
         ClusterMetadataService.setInstance(new ClusterMetadataService(new 
UniformRangePlacement(),
                                                                       
wrapProcessor,
                                                                       
ClusterMetadataService::state,
                                                                       
logSpec));
-        ClusterMetadataService.instance().initRecentlySealedPeriodsIndex();
-        ClusterMetadataService.instance().log().replayPersisted();
-        ClusterMetadata replayed = 
ClusterMetadataService.instance().log().metadata();
-        scrubDataDirectories(replayed);
-        replayed.schema.initializeKeyspaceInstances(DistributedSchema.empty());
         ClusterMetadataService.instance().log().ready();
+        ClusterMetadataService.instance().initRecentlySealedPeriodsIndex();
 
         NodeId nodeId = ClusterMetadata.current().myNodeId();
         UUID currentHostId = SystemKeyspace.getLocalHostId();
@@ -179,6 +176,10 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
         }
     }
 
+    public interface AfterReplay
+    {
+        void accept(ClusterMetadata t) throws StartupException;
+    }
     /**
      * Initialization for Discovery.
      *
@@ -243,15 +244,17 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
     public static void initializeFromGossip(Function<Processor, Processor> 
wrapProcessor, Runnable initMessaging) throws StartupException
     {
         ClusterMetadata emptyFromSystemTables = 
emptyWithSchemaFromSystemTables(SystemKeyspace.allKnownDatacenters());
-        LocalLog.LogSpec logSpec = new 
LocalLog.LogSpec().withInitialState(emptyFromSystemTables)
-                                                         
.withStorage(LogStorage.SystemKeyspace)
-                                                         
.withDefaultListeners();
+        LocalLog.LogSpec logSpec = LocalLog.logSpec()
+                                           
.withInitialState(emptyFromSystemTables)
+                                           
.afterReplay(Startup::scrubDataDirectories)
+                                           
.withStorage(LogStorage.SystemKeyspace)
+                                           .withDefaultListeners();
+
         ClusterMetadataService.setInstance(new ClusterMetadataService(new 
UniformRangePlacement(),
                                                                       
wrapProcessor,
                                                                       
ClusterMetadataService::state,
                                                                       
logSpec));
-        scrubDataDirectories(emptyFromSystemTables);
-        
emptyFromSystemTables.schema.initializeKeyspaceInstances(DistributedSchema.empty());
+
         ClusterMetadataService.instance().log().ready();
         initMessaging.run();
         try
@@ -280,8 +283,9 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
         assert cmGossip.equals(initial) : cmGossip + " != " + initial;
     }
 
-    public static void reinitializeWithClusterMetadata(String fileName, 
Function<Processor, Processor> wrapProcessor, Runnable initMessaging) throws 
IOException
+    public static void reinitializeWithClusterMetadata(String fileName, 
Function<Processor, Processor> wrapProcessor, Runnable initMessaging) throws 
IOException, StartupException
     {
+        ClusterMetadata prev = ClusterMetadata.currentNullable();
         // First set a minimal ClusterMetadata as some deserialization depends
         // on ClusterMetadata.current() to access the partitioner
         StubClusterMetadataService initial = 
StubClusterMetadataService.forClientTools();
@@ -298,25 +302,21 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
 
         if (!metadata.isCMSMember(FBUtilities.getBroadcastAddressAndPort()))
             throw new IllegalStateException("When reinitializing with cluster 
metadata, we must be in the CMS");
-        // can use local dc here since we know local host in the cms:
-        ClusterMetadata emptyFromSystemTables = 
emptyWithSchemaFromSystemTables(Collections.singleton(DatabaseDescriptor.getLocalDataCenter()));
-        metadata.schema.initializeKeyspaceInstances(DistributedSchema.empty());
+
         metadata = metadata.forceEpoch(metadata.epoch.nextEpoch());
         ClusterMetadataService.unsetInstance();
-        LocalLog.LogSpec logSpec = new 
LocalLog.LogSpec().withInitialState(metadata)
-                                                         
.withStorage(LogStorage.SystemKeyspace)
-                                                         
.withDefaultListeners()
-                                                         .isReset(true)
-                                                         
.withReadyNotification(LocalLog.LogSpec.WhenReady.NONE);
+        LocalLog.LogSpec logSpec = LocalLog.logSpec()
+                                           .withPreviousState(prev)
+                                           .withInitialState(metadata)
+                                           
.withStorage(LogStorage.SystemKeyspace)
+                                           .withDefaultListeners()
+                                           .isReset(true);
 
         ClusterMetadataService.setInstance(new ClusterMetadataService(new 
UniformRangePlacement(),
                                                                       
wrapProcessor,
                                                                       
ClusterMetadataService::state,
                                                                       
logSpec));
-        // When re-intializing from a loaded metadata instance we need to fire 
notifications using the delta between an
-        // empty metadata and the loaded one. So we configure the LogSpec not 
to do any notifications and handle it
-        // explicitly here.
-        
ClusterMetadataService.instance().log().notifyListeners(emptyFromSystemTables);
+
         ClusterMetadataService.instance().log().ready();
         initMessaging.run();
         
ClusterMetadataService.instance().forceSnapshot(metadata.forceEpoch(metadata.nextEpoch()));
diff --git a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java 
b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java
index 066b33ceb0..475e8ef21b 100644
--- a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java
+++ b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java
@@ -67,12 +67,16 @@ public class StubClusterMetadataService extends 
ClusterMetadataService
     {
         super(new UniformRangePlacement(),
               MetadataSnapshots.NO_OP,
-              LocalLog.sync(new LocalLog.LogSpec().withInitialState(initial)),
+              LocalLog.logSpec()
+                      .loadSSTables(false)
+                      .sync()
+                      .withInitialState(initial)
+                      .createLog(),
               new StubProcessor(),
               Commit.Replicator.NO_OP,
               false);
         this.metadata = initial;
-        this.log().ready();
+        this.log().readyUnchecked();
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java 
b/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java
index 2787f5eb2c..e3507bf2bb 100644
--- a/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java
+++ b/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java
@@ -29,17 +29,23 @@ import org.apache.cassandra.tcm.ClusterMetadata;
 
 public class SchemaListener implements ChangeListener
 {
+    private final boolean loadSSTables;
+
+    public SchemaListener(boolean loadSSTables)
+    {
+        this.loadSSTables = loadSSTables;
+    }
+
     @Override
     public void notifyPreCommit(ClusterMetadata prev, ClusterMetadata next, 
boolean fromSnapshot)
     {
-        notifyInternal(prev, next, fromSnapshot, true);
+        notifyInternal(prev, next, fromSnapshot, loadSSTables);
     }
 
     protected void notifyInternal(ClusterMetadata prev, ClusterMetadata next, 
boolean fromSnapshot, boolean loadSSTables)
     {
         if (!fromSnapshot && 
next.schema.lastModified().equals(prev.schema.lastModified()))
             return;
-
         next.schema.initializeKeyspaceInstances(prev.schema, loadSSTables);
     }
 
diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java 
b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
index 9f21dc9f03..ab519b4157 100644
--- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java
+++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
@@ -42,10 +42,12 @@ import org.apache.cassandra.concurrent.Interruptible;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.exceptions.StartupException;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.Startup;
 import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.listeners.ChangeListener;
 import org.apache.cassandra.tcm.listeners.ClientNotificationListener;
@@ -96,25 +98,57 @@ public abstract class LocalLog implements Closeable
     // notification to them all.
     private final AtomicBoolean replayComplete = new AtomicBoolean();
 
-    public static class LogSpec
+    public static LogSpec logSpec()
     {
-        public enum WhenReady { NONE, PRE_COMMIT_ONLY, POST_COMMIT_ONLY, ALL};
+        return new LogSpec();
+    }
 
-        private ClusterMetadata initial = new 
ClusterMetadata(DatabaseDescriptor.getPartitioner());
+    public static class LogSpec
+    {
+        private ClusterMetadata initial;
+        private ClusterMetadata prev;
+        private Startup.AfterReplay afterReplay = (metadata) -> {};
         private LogStorage storage = LogStorage.None;
+        private boolean async = true;
         private boolean defaultListeners = false;
-        private boolean reset = false;
-        private WhenReady whenReady = WhenReady.POST_COMMIT_ONLY;
+        private boolean isReset = false;
+        private boolean loadSSTables = true;
 
         private final Set<LogListener> listeners = new HashSet<>();
         private final Set<ChangeListener> changeListeners = new HashSet<>();
         private final Set<ChangeListener.Async> asyncChangeListeners = new 
HashSet<>();
 
+        private LogSpec()
+        {
+        }
+
+        /**
+         * create a sync log - only used for tests and tools
+         * @return
+         */
+        public LogSpec sync()
+        {
+            this.async = false;
+            return this;
+        }
+
+        public LogSpec async()
+        {
+            this.async = true;
+            return this;
+        }
+
         public LogSpec withDefaultListeners()
         {
             return withDefaultListeners(true);
         }
 
+        public LogSpec loadSSTables(boolean loadSSTables)
+        {
+            this.loadSSTables = loadSSTables;
+            return this;
+        }
+
         public LogSpec withDefaultListeners(boolean withDefaultListeners)
         {
             if (withDefaultListeners &&
@@ -148,13 +182,18 @@ public abstract class LocalLog implements Closeable
 
         public LogSpec isReset(boolean isReset)
         {
-            reset = isReset;
+            this.isReset = isReset;
             return this;
         }
 
         public boolean isReset()
         {
-            return reset;
+            return this.isReset;
+        }
+
+        public LogStorage storage()
+        {
+            return storage;
         }
 
         public LogSpec withStorage(LogStorage storage)
@@ -163,17 +202,31 @@ public abstract class LocalLog implements Closeable
             return this;
         }
 
+        public LogSpec afterReplay(Startup.AfterReplay afterReplay)
+        {
+            this.afterReplay = afterReplay;
+            return this;
+        }
+
         public LogSpec withInitialState(ClusterMetadata initial)
         {
             this.initial = initial;
             return this;
         }
 
-        public LogSpec withReadyNotification(WhenReady whenReady)
+        public LogSpec withPreviousState(ClusterMetadata prev)
         {
-            this.whenReady = whenReady;
+            this.prev = prev;
             return this;
         }
+
+        public final LocalLog createLog()
+        {
+            if (async)
+                return new Async(this);
+            else
+                return new Sync(this);
+        }
     }
 
     /**
@@ -196,21 +249,27 @@ public abstract class LocalLog implements Closeable
         return e1.epoch.compareTo(e2.epoch);
     });
 
-    protected final LogStorage persistence;
+    protected final LogStorage storage;
     protected final Set<LogListener> listeners;
     protected final Set<ChangeListener> changeListeners;
     protected final Set<ChangeListener.Async> asyncChangeListeners;
-    private final LogSpec spec;
+    protected final LogSpec spec;
 
-    private LocalLog(LogSpec spec)
+    private LocalLog(LogSpec logSpec)
     {
-        assert spec.initial.epoch.is(EMPTY) || 
spec.initial.epoch.is(Epoch.UPGRADE_STARTUP) || spec.reset;
-        committed = new AtomicReference<>(spec.initial);
-        this.persistence = spec.storage;
+        this.spec = logSpec;
+        if (spec.initial == null)
+            spec.initial = new 
ClusterMetadata(DatabaseDescriptor.getPartitioner());
+        if (spec.prev == null)
+            spec.prev = new 
ClusterMetadata(DatabaseDescriptor.getPartitioner());
+        assert spec.initial.epoch.is(EMPTY) || 
spec.initial.epoch.is(Epoch.UPGRADE_STARTUP) || spec.isReset :
+        String.format(String.format("Should start with empty epoch, unless 
we're in upgrade or reset mode: %s (isReset: %s)", spec.initial, spec.isReset));
+
+        this.committed = new AtomicReference<>(logSpec.initial);
+        this.storage = logSpec.storage;
         listeners = Sets.newConcurrentHashSet();
         changeListeners = Sets.newConcurrentHashSet();
         asyncChangeListeners = Sets.newConcurrentHashSet();
-        this.spec = spec;
     }
 
     public void bootstrap(InetAddressAndPort addr)
@@ -250,34 +309,6 @@ public abstract class LocalLog implements Closeable
         return pending.size();
     }
 
-    public static LocalLog sync(LogSpec spec)
-    {
-        return new Sync(spec);
-    }
-
-    public static LocalLog async(LogSpec spec)
-    {
-        return new Async(spec);
-    }
-
-    @VisibleForTesting
-    public static LocalLog asyncForTests()
-    {
-        LogSpec logSpec = new LogSpec();
-        LocalLog log = new Async(logSpec);
-        log.ready();
-        return log;
-    }
-
-    @VisibleForTesting
-    public static LocalLog asyncForTests(ClusterMetadata initial)
-    {
-        LogSpec logSpec = new LogSpec().withInitialState(initial);
-        LocalLog log = new Async(logSpec);
-        log.ready();
-        return log;
-    }
-
     public boolean hasGaps()
     {
         Epoch start = committed.get().epoch;
@@ -305,7 +336,7 @@ public abstract class LocalLog implements Closeable
 
     public Replication getCommittedEntries(Epoch since)
     {
-        return persistence.getReplication(since);
+        return storage.getReplication(since);
     }
 
     public ClusterMetadata waitForHighestConsecutive()
@@ -448,7 +479,7 @@ public abstract class LocalLog implements Closeable
                                   next.epoch, pendingEntry.transform, 
prev.epoch);
 
                     if (replayComplete.get())
-                        
persistence.append(transformed.success().metadata.period, 
pendingEntry.maybeUnwrapExecuted());
+                        storage.append(transformed.success().metadata.period, 
pendingEntry.maybeUnwrapExecuted());
 
                     notifyPreCommit(prev, next, isSnapshot);
 
@@ -505,13 +536,13 @@ public abstract class LocalLog implements Closeable
     /**
      * Replays items that were persisted during previous starts. Replayed 
items _will not_ be persisted again.
      */
-    public Epoch replayPersisted()
+    private ClusterMetadata replayPersisted()
     {
         if (replayComplete.get())
             throw new IllegalStateException("Can only replay persisted once.");
-        LogState logState = persistence.getLogState(metadata().epoch);
+        LogState logState = storage.getLogState(metadata().epoch);
         append(logState);
-        return waitForHighestConsecutive().epoch;
+        return waitForHighestConsecutive();
     }
 
     private void maybeNotifyListeners(Entry entry, Transformation.Result 
result)
@@ -538,11 +569,12 @@ public abstract class LocalLog implements Closeable
         this.changeListeners.remove(listener);
     }
 
-    public void notifyListeners(ClusterMetadata emptyFromSystemTables)
+    public void notifyListeners(ClusterMetadata prev)
     {
-        ClusterMetadata metadata = ClusterMetadata.current();
-        notifyPreCommit(emptyFromSystemTables, metadata, true);
-        notifyPostCommit(emptyFromSystemTables, metadata, true);
+        ClusterMetadata metadata = committed.get();
+        logger.info("Notifying listeners, prev epoch = {}, current epoch = 
{}", prev.epoch, metadata.epoch);
+        notifyPreCommit(prev, metadata, true);
+        notifyPostCommit(prev, metadata, true);
     }
 
     private void notifyPreCommit(ClusterMetadata before, ClusterMetadata 
after, boolean fromSnapshot)
@@ -561,15 +593,59 @@ public abstract class LocalLog implements Closeable
             ScheduledExecutors.optionalTasks.submit(() -> 
listener.notifyPostCommit(before, after, fromSnapshot));
     }
 
+    /**
+     * Essentially same as `ready` but throws an unchecked exception
+     */
+    @VisibleForTesting
+    public final ClusterMetadata readyUnchecked()
+    {
+        try
+        {
+            return ready();
+        }
+        catch (StartupException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public ClusterMetadata ready() throws StartupException
+    {
+        ClusterMetadata metadata = replayPersisted();
+        spec.afterReplay.accept(metadata);
+        logger.info("Marking LocalLog ready at epoch {}", metadata.epoch);
+
+        if (!replayComplete.compareAndSet(false, true))
+            throw new IllegalStateException("Log is already fully 
initialised");
+
+        logger.debug("Marking LocalLog ready at epoch {}", 
committed.get().epoch);
+        if (spec.defaultListeners)
+        {
+            logger.info("Adding default listeners to LocalLog");
+            addListeners();
+        }
+        else
+        {
+            logger.info("Adding specified listeners to LocalLog");
+            spec.listeners.forEach(this::addListener);
+            spec.changeListeners.forEach(this::addListener);
+            spec.asyncChangeListeners.forEach(this::addListener);
+        }
+
+        logger.info("Notifying all registered listeners of both pre and post 
commit event");
+        notifyListeners(spec.prev);
+
+        return metadata;
+    }
 
     private static class Async extends LocalLog
     {
         private final AsyncRunnable runnable;
         private final Interruptible executor;
 
-        private Async(LogSpec spec)
+        private Async(LogSpec logSpec)
         {
-            super(spec);
+            super(logSpec);
             this.runnable = new AsyncRunnable();
             this.executor = 
ExecutorFactory.Global.executorFactory().infiniteLoop("GlobalLogFollower", 
runnable, SAFE, NON_DAEMON, UNSYNCHRONIZED);
         }
@@ -743,9 +819,9 @@ public abstract class LocalLog implements Closeable
 
     private static class Sync extends LocalLog
     {
-        private Sync(LogSpec spec)
+        private Sync(LogSpec logSpec)
         {
-            super(spec);
+            super(logSpec);
         }
 
         void runOnce(DurationSpec durationSpec)
@@ -780,7 +856,7 @@ public abstract class LocalLog implements Closeable
 
         addListener(snapshotListener());
         addListener(new InitializationListener());
-        addListener(new SchemaListener());
+        addListener(new SchemaListener(spec.loadSSTables));
         addListener(new LegacyStateListener());
         addListener(new PlacementsChangeListener());
         addListener(new MetadataSnapshotListener());
@@ -788,45 +864,6 @@ public abstract class LocalLog implements Closeable
         addListener(new UpgradeMigrationListener());
     }
 
-    public void ready()
-    {
-        if (!replayComplete.compareAndSet(false, true))
-            throw new IllegalStateException("Log is already fully 
initialised");
-
-        logger.debug("Marking LocalLog ready at epoch {}", 
committed.get().epoch);
-        if (spec.defaultListeners)
-        {
-            logger.debug("Adding default listeners to LocalLog");
-            addListeners();
-        }
-        else
-        {
-            logger.debug("Adding specified listeners to LocalLog");
-            spec.listeners.forEach(this::addListener);
-            spec.changeListeners.forEach(this::addListener);
-            spec.asyncChangeListeners.forEach(this::addListener);
-        }
-
-        switch (spec.whenReady)
-        {
-            case ALL:
-                logger.debug("Notifying all registered listeners of both pre 
and post commit event");
-                notifyListeners(spec.initial);
-                break;
-            case PRE_COMMIT_ONLY:
-                logger.debug("Notifying all registered listeners of pre-commit 
event only");
-                notifyPreCommit(spec.initial, committed.get(), true);
-                break;
-            case POST_COMMIT_ONLY:
-                logger.debug("Notifying all registered listeners of 
post-commit event only");
-                notifyPostCommit(spec.initial, committed.get(), true);
-                break;
-            case NONE:
-                logger.debug("Not notifying registered listeners of pre or 
post commit events");
-                break;
-        }
-    }
-
     private LogListener snapshotListener()
     {
         return (entry, metadata) -> {
diff --git a/src/java/org/apache/cassandra/tcm/log/LogStorage.java 
b/src/java/org/apache/cassandra/tcm/log/LogStorage.java
index e35dcfb2b1..3c88cf9b86 100644
--- a/src/java/org/apache/cassandra/tcm/log/LogStorage.java
+++ b/src/java/org/apache/cassandra/tcm/log/LogStorage.java
@@ -43,6 +43,7 @@ public interface LogStorage extends LogReader
         {
             return LogState.EMPTY;
         }
+        public void truncate() {}
         public Replication getReplication(Epoch since)
         {
             return Replication.EMPTY;
diff --git a/src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java 
b/src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java
index 83c443731b..1f372f7318 100644
--- a/src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java
+++ b/src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java
@@ -101,6 +101,11 @@ public class SystemKeyspaceStorage implements LogStorage
         return LogState.getLogState(since, snapshots.get(), this);
     }
 
+    public void truncate()
+    {
+        
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(NAME).truncateBlockingWithoutSnapshot();
+    }
+
     /**
      * Uses the supplied period as a starting point to iterate through the log 
table
      * collating log entries which follow the supplied epoch. It is assumed 
that the
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java
index b6b0eba2f6..2ffddaed7e 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java
@@ -110,8 +110,7 @@ public class JMXFeatureTest extends TestBaseImpl
             Assert.assertThat(statusResult.getStderr(), 
is(blankOrNullString()));
             Assert.assertThat(statusResult.getStdout(), containsString("DN  
127.0.0.1"));
             testInstance(instances, cluster.get(2));
-            ClusterUtils.start(instanceToStop, props -> {
-            });
+            ClusterUtils.start(instanceToStop, props -> {});
             ClusterUtils.awaitRingState(otherInstance, instanceToStop, 
"Normal");
             ClusterUtils.awaitRingStatus(otherInstance, instanceToStop, "Up");
             statusResult = cluster.get(1).nodetoolResult("status");
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java 
b/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java
index eb78bc7e38..bd4ea3c7db 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java
@@ -96,9 +96,11 @@ public class CMSTestBase
             this.rf = rf;
             schemaProvider = Mockito.mock(SchemaProvider.class);
             ClusterMetadata initial = new ClusterMetadata(partitioner);
-            LocalLog.LogSpec logSpec = new 
LocalLog.LogSpec().withInitialState(initial).withDefaultListeners(addListeners);
-            log = LocalLog.sync(logSpec);
-            log.ready();
+            log = LocalLog.logSpec()
+                          .sync()
+                          .withInitialState(initial)
+                          .withDefaultListeners(addListeners)
+                          .createLog();
 
             service = new ClusterMetadataService(new UniformRangePlacement(),
                                                  MetadataSnapshots.NO_OP,
@@ -108,6 +110,7 @@ public class CMSTestBase
                                                  true);
 
             ClusterMetadataService.setInstance(service);
+            log.readyUnchecked();
             log.bootstrap(FBUtilities.getBroadcastAddressAndPort());
             service.commit(new Initialize(ClusterMetadata.current()) {
                 public Result execute(ClusterMetadata prev)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
index 0046e170c8..093ac0bcd6 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
@@ -22,8 +22,8 @@ import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Random;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
@@ -46,9 +46,9 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.schema.DistributedSchema;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.schema.DistributedSchema;
 import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.ReplicationParams;
 import org.apache.cassandra.schema.Schema;
@@ -63,6 +63,8 @@ import org.apache.cassandra.tcm.MetadataSnapshots;
 import org.apache.cassandra.tcm.Period;
 import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.log.LocalLog;
+import org.apache.cassandra.tcm.log.LogState;
+import org.apache.cassandra.tcm.log.Replication;
 import org.apache.cassandra.tcm.membership.Location;
 import org.apache.cassandra.tcm.membership.NodeAddresses;
 import org.apache.cassandra.tcm.membership.NodeId;
@@ -73,9 +75,9 @@ import 
org.apache.cassandra.tcm.ownership.UniformRangePlacement;
 import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
 import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
 import org.apache.cassandra.tcm.sequences.BootstrapAndReplace;
-import org.apache.cassandra.tcm.sequences.Move;
 import org.apache.cassandra.tcm.sequences.LeaveStreams;
 import org.apache.cassandra.tcm.sequences.ReconfigureCMS;
+import org.apache.cassandra.tcm.sequences.Move;
 import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave;
 import org.apache.cassandra.tcm.transformations.AlterSchema;
 import org.apache.cassandra.tcm.transformations.PrepareJoin;
@@ -123,13 +125,15 @@ public class ClusterMetadataTestHelper
     public static ClusterMetadataService instanceForTest()
     {
         ClusterMetadata current = new 
ClusterMetadata(DatabaseDescriptor.getPartitioner());
-        LocalLog log = LocalLog.asyncForTests();
+        LocalLog log = LocalLog.logSpec()
+                               .createLog();
         ResettableClusterMetadataService service = new 
ResettableClusterMetadataService(new UniformRangePlacement(),
                                                                                
         MetadataSnapshots.NO_OP,
                                                                                
         log,
                                                                                
         new AtomicLongBackedProcessor(log),
                                                                                
         Commit.Replicator.NO_OP,
                                                                                
         true);
+        log.readyUnchecked();
         log.bootstrap(FBUtilities.getBroadcastAddressAndPort());
         service.commit(new Initialize(current));
         QueryProcessor.registerStatementInvalidatingListener();
@@ -137,26 +141,6 @@ public class ClusterMetadataTestHelper
         return service;
     }
 
-    /**
-     * Create a pre-configured CMS which supports mark & reset for use in 
tests. This version dose not perform initial
-     * CMS setup, neither bootstrapping the log nor applying an Initialize 
transformation. It assumes that the supplied
-     * ClusterMetadata instance is in the state required by the specific 
caller.
-     * @return a resettable CMS instance, to be used in a call to 
ClusterMetadataService::setInstance
-     */
-    public static ClusterMetadataService instanceForTest(ClusterMetadata 
current)
-    {
-        LocalLog log = LocalLog.asyncForTests();
-        ResettableClusterMetadataService service = new 
ResettableClusterMetadataService(new UniformRangePlacement(),
-                                                                               
         MetadataSnapshots.NO_OP,
-                                                                               
         log,
-                                                                               
         new AtomicLongBackedProcessor(log),
-                                                                               
         Commit.Replicator.NO_OP,
-                                                                               
         true);
-        QueryProcessor.registerStatementInvalidatingListener();
-        service.mark();
-        return service;
-    }
-
     public static ClusterMetadata minimalForTesting(IPartitioner partitioner)
     {
         return new ClusterMetadata(Epoch.EMPTY,
@@ -186,13 +170,13 @@ public class ClusterMetadataTestHelper
                                    null,
                                    ImmutableMap.of());
     }
+
     public static void forceCurrentPeriodTo(long period)
     {
+        ClusterMetadataService.unsetInstance();
+        ClusterMetadataService.setInstance(instanceForTest());
         ClusterMetadata metadata = ClusterMetadata.currentNullable();
-        if (metadata == null)
-            metadata = new 
ClusterMetadata(DatabaseDescriptor.getPartitioner());
-
-        metadata = new ClusterMetadata(metadata.epoch,
+        metadata = new ClusterMetadata(metadata.epoch.nextEpoch(),
                                        period,
                                        metadata.lastInPeriod,
                                        metadata.partitioner,
@@ -203,20 +187,23 @@ public class ClusterMetadataTestHelper
                                        metadata.lockedRanges,
                                        metadata.inProgressSequences,
                                        metadata.extensions);
-        ClusterMetadataService.unsetInstance();
-        ClusterMetadataService.setInstance(instanceForTest(metadata));
+        ClusterMetadataService.instance().log().append(new LogState(metadata, 
Replication.of(Collections.emptyList())));
     }
 
     public static ClusterMetadataService syncInstanceForTest()
     {
-        LocalLog log = LocalLog.sync(new LocalLog.LogSpec());
-        log.ready();
-        return new ClusterMetadataService(new UniformRangePlacement(),
-                                          MetadataSnapshots.NO_OP,
-                                          log,
-                                          new AtomicLongBackedProcessor(log),
-                                          Commit.Replicator.NO_OP,
-                                          true);
+        LocalLog log = LocalLog.logSpec()
+                               .sync()
+                               .createLog();
+        ClusterMetadataService cms = new ClusterMetadataService(new 
UniformRangePlacement(),
+                                                                
MetadataSnapshots.NO_OP,
+                                                                log,
+                                                                new 
AtomicLongBackedProcessor(log),
+                                                                
Commit.Replicator.NO_OP,
+                                                                true);
+
+        log.readyUnchecked();
+        return cms;
     }
 
     public static void createKeyspace(String name, KeyspaceParams params)
@@ -247,7 +234,6 @@ public class ClusterMetadataTestHelper
         }
     }
 
-
     private static Set<InetAddressAndPort> leaving(ClusterMetadata metadata)
     {
         return  metadata.directory.states.entrySet().stream()
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java
index 8c41d17ddc..512fa19c5b 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java
@@ -642,9 +642,12 @@ public abstract class CoordinatorPathTestBase extends 
FuzzTestBase
             assert executor == null;
             LogStorage logStorage = new 
AtomicLongBackedProcessor.InMemoryStorage();
             ClusterMetadata initial = new ClusterMetadata(partitioner);
-            LocalLog.LogSpec logSpec = new 
LocalLog.LogSpec().withInitialState(initial).withStorage(logStorage);
-            LocalLog log = LocalLog.sync(logSpec);
-            log.ready();
+            LocalLog log = LocalLog.logSpec()
+                                   .withInitialState(initial)
+                                   .sync()
+                                   .withStorage(logStorage)
+                                   .createLog();
+
             // Replicator only replicates to the node under test, as there are 
no other nodes in reality
             Commit.Replicator replicator = (result, source) -> {
                 
realCluster.deliverMessage(realCluster.get(1).broadcastAddress(),
@@ -661,7 +664,7 @@ public abstract class CoordinatorPathTestBase extends 
FuzzTestBase
                                                                         
replicator,
                                                                         true);
             ClusterMetadataService.setInstance(service);
-
+            log.readyUnchecked();
             log.bootstrap(cms.addr());
             service.commit(new Initialize(log.metadata()));
             service.commit(new Register(new NodeAddresses(cms.addr()), new 
Location(cms.dc(), cms.rack()), NodeVersion.CURRENT));
@@ -720,8 +723,9 @@ public abstract class CoordinatorPathTestBase extends 
FuzzTestBase
 
             // We need to create a second node to be able to send and receive 
messages.
             RealSimulatedNode driver = createNode();
-            LocalLog log = LocalLog.sync(new LocalLog.LogSpec());
-            log.ready();
+            LocalLog log = LocalLog.logSpec()
+                           .sync()
+                           .createLog();
 
             ClusterMetadataService metadataService =
             new ClusterMetadataService(new UniformRangePlacement(),
@@ -754,6 +758,8 @@ public abstract class CoordinatorPathTestBase extends 
FuzzTestBase
                                        false);
 
             ClusterMetadataService.setInstance(metadataService);
+            log.readyUnchecked();
+
             driver.clean(TCM_REPLICATION);
             driver.on(Verb.TCM_REPLICATION, new SimulatedAction<Replication, 
NoPayload>()
             {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java
index 6e819d825a..5ca7bd9f5d 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java
@@ -94,16 +94,16 @@ public class DecommissionTest extends TestBaseImpl
     }
 
     @Test
-    public void testDecomDirectoryMinMaxVersions() throws IOException
-    {
-        try (Cluster cluster = builder().withNodes(3)
+    public void testDecomDirectoryMinMaxVersions() throws IOException {
+        try (Cluster cluster = builder()
+                               .withConfig(cfg -> cfg.with(GOSSIP))
+                               .withNodes(3)
                 .start())
         {
             cluster.get(3).nodetoolResult("decommission", 
"--force").asserts().success();
 
             cluster.get(1).runOnInstance(() -> {
                 ClusterMetadata metadata = ClusterMetadata.current();
-
                 ClusterMetadataService.instance().commit(new 
Startup(metadata.myNodeId(),
                                                                      
metadata.directory.getNodeAddresses(metadata.myNodeId()),
                                                                      new 
NodeVersion(new CassandraVersion("6.0.0"),
@@ -133,6 +133,7 @@ public class DecommissionTest extends TestBaseImpl
     @Test
     public void testMixedVersionBlockDecom() throws IOException {
         try (Cluster cluster = builder().withNodes(3)
+                                        .withConfig(config -> 
config.with(GOSSIP))
                                         .start())
         {
             cluster.get(3).nodetoolResult("decommission", 
"--force").asserts().success();
diff --git a/test/unit/org/apache/cassandra/ServerTestUtils.java 
b/test/unit/org/apache/cassandra/ServerTestUtils.java
index 42a18600e6..29e1a84a1e 100644
--- a/test/unit/org/apache/cassandra/ServerTestUtils.java
+++ b/test/unit/org/apache/cassandra/ServerTestUtils.java
@@ -24,8 +24,8 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,21 +33,20 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.audit.AuditLogManager;
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.big.BigTableReader;
 import org.apache.cassandra.io.sstable.indexsummary.IndexSummarySupport;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.locator.AbstractEndpointSnitch;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.schema.DistributedSchema;
 import org.apache.cassandra.security.ThreadAwareSecurityManager;
 import org.apache.cassandra.service.EmbeddedCassandraService;
 import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
@@ -59,6 +58,7 @@ import org.apache.cassandra.tcm.MetadataSnapshots;
 import org.apache.cassandra.tcm.Processor;
 import org.apache.cassandra.tcm.log.LocalLog;
 import org.apache.cassandra.tcm.log.LogStorage;
+import org.apache.cassandra.tcm.log.SystemKeyspaceStorage;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.ownership.PlacementProvider;
 import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
@@ -265,7 +265,7 @@ public final class ServerTestUtils
 
     public static void initCMS()
     {
-        // Effectively disable automatic snapshots using 
AtomicLongBackedProcessopr and LocaLLog.Sync interacts
+        // Effectively disable automatic snapshots using 
AtomicLongBackedProcessor and LocaLLog.Sync interacts
         // badly with submitting SealPeriod transformations from the log 
listener. In this configuration, SealPeriod
         // commits performed on NonPeriodicTasks threads end up actually 
performing the transformations as well as
         // calling the pre and post commit listeners, which is not threadsafe. 
In a non-test setup the processing of
@@ -276,9 +276,14 @@ public final class ServerTestUtils
         IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
         boolean addListeners = true;
         ClusterMetadata initial = new ClusterMetadata(partitioner);
-        LocalLog.LogSpec logSpec = new 
LocalLog.LogSpec().withInitialState(initial)
-                                                         
.withDefaultListeners(addListeners);
-        LocalLog log = LocalLog.async(logSpec);
+        if (!Keyspace.isInitialized())
+            Keyspace.setInitialized();
+
+        LocalLog log = LocalLog.logSpec()
+                               .withInitialState(initial)
+                               .withDefaultListeners(addListeners)
+                               .createLog();
+
         ResettableClusterMetadataService service = new 
ResettableClusterMetadataService(new UniformRangePlacement(),
                                                                                
         MetadataSnapshots.NO_OP,
                                                                                
         log,
@@ -287,10 +292,7 @@ public final class ServerTestUtils
                                                                                
         true);
 
         ClusterMetadataService.setInstance(service);
-        initial.schema.initializeKeyspaceInstances(DistributedSchema.empty());
-        if (!Keyspace.isInitialized())
-            Keyspace.setInitialized();
-        log.ready();
+        log.readyUnchecked();
         log.bootstrap(FBUtilities.getBroadcastAddressAndPort());
         service.commit(new Initialize(ClusterMetadata.current()));
         QueryProcessor.registerStatementInvalidatingListener();
@@ -309,12 +311,13 @@ public final class ServerTestUtils
         // |-- StorageService.instance.setPartitionerUnsafe(M3P)  # test wants 
to use LongToken
         // |-- ServerTestUtils.recreateCMS                        # recreates 
the CMS using the updated partitioner
         ClusterMetadata initial = new 
ClusterMetadata(DatabaseDescriptor.getPartitioner());
-        initial.schema.initializeKeyspaceInstances(DistributedSchema.empty());
-        LocalLog.LogSpec logSpec = new 
LocalLog.LogSpec().withInitialState(initial)
-                                                         
.withStorage(LogStorage.SystemKeyspace)
-                                                         
.withDefaultListeners();
-        LocalLog log = LocalLog.async(logSpec);
-        log.ready();
+        LogStorage storage = LogStorage.SystemKeyspace;
+        LocalLog.LogSpec logSpec = LocalLog.logSpec()
+                                           .withInitialState(initial)
+                                           .withStorage(storage)
+                                           .withDefaultListeners();
+        LocalLog log = logSpec.createLog();
+
         ResettableClusterMetadataService cms = new 
ResettableClusterMetadataService(new UniformRangePlacement(),
                                                                                
     MetadataSnapshots.NO_OP,
                                                                                
     log,
@@ -323,6 +326,8 @@ public final class ServerTestUtils
                                                                                
     true);
         ClusterMetadataService.unsetInstance();
         ClusterMetadataService.setInstance(cms);
+        ((SystemKeyspaceStorage)LogStorage.SystemKeyspace).truncate();
+        log.readyUnchecked();
         log.bootstrap(FBUtilities.getBroadcastAddressAndPort());
         cms.mark();
     }
diff --git a/test/unit/org/apache/cassandra/hints/HintsUpgradeTest.java 
b/test/unit/org/apache/cassandra/hints/HintsUpgradeTest.java
index e1d7532ac0..3d8c443121 100644
--- a/test/unit/org/apache/cassandra/hints/HintsUpgradeTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsUpgradeTest.java
@@ -26,6 +26,7 @@ import java.util.function.Consumer;
 import com.google.common.collect.ImmutableMap;
 import org.junit.After;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
@@ -47,6 +48,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+@Ignore("TODO: TCM")
 public class HintsUpgradeTest
 {
     static
diff --git a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java 
b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
index 09fda7c5f5..cf6d6ed400 100644
--- a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
+++ b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
-
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -34,6 +33,7 @@ import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.exceptions.RequestTimeoutException;
 import org.apache.cassandra.transport.Message;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.transport.SimpleClient;
@@ -74,12 +74,12 @@ public class ClientWarningsTest extends CQLTester
         {
             client.connect(false);
 
-            QueryMessage query = new QueryMessage(createBatchStatement2(1), 
QueryOptions.DEFAULT);
-            Message.Response resp = client.execute(query);
+            Message.Response resp = executeWithRetries(client,
+                                                        new 
QueryMessage(createBatchStatement2(1), QueryOptions.DEFAULT));
             assertNull(resp.getWarnings());
 
-            query = new 
QueryMessage(createBatchStatement2(DatabaseDescriptor.getBatchSizeWarnThreshold()),
 QueryOptions.DEFAULT);
-            resp = client.execute(query);
+            resp = executeWithRetries(client,
+                                      new 
QueryMessage(createBatchStatement2(DatabaseDescriptor.getBatchSizeWarnThreshold()),
 QueryOptions.DEFAULT));
             assertEquals(1, resp.getWarnings().size());
         }
     }
@@ -94,12 +94,14 @@ public class ClientWarningsTest extends CQLTester
         {
             client.connect(false);
 
-            QueryMessage query = new 
QueryMessage(createBatchStatement2(DatabaseDescriptor.getBatchSizeWarnThreshold()
 / 2 + 1), QueryOptions.DEFAULT);
-            Message.Response resp = client.execute(query);
+            Message.Response resp = executeWithRetries(client,
+                                                       new 
QueryMessage(createBatchStatement2(DatabaseDescriptor.getBatchSizeWarnThreshold()
 / 2 + 1), QueryOptions.DEFAULT));
             assertEquals(1, resp.getWarnings().size());
 
-            query = new 
QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()),
 QueryOptions.DEFAULT);
-            resp = client.execute(query);
+
+            resp = executeWithRetries(client,
+                                      new 
QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()),
 QueryOptions.DEFAULT));
+
             assertNull(resp.getWarnings());
         }
     }
@@ -116,35 +118,50 @@ public class ClientWarningsTest extends CQLTester
 
             for (int i = 0; i < iterations; i++)
             {
-                QueryMessage query = new QueryMessage(String.format("INSERT 
INTO %s.%s (pk, ck, v) VALUES (1, %s, 1)",
-                                                                    KEYSPACE,
-                                                                    
currentTable(),
-                                                                    i), 
QueryOptions.DEFAULT);
-                client.execute(query);
+                executeWithRetries(client,
+                                   new QueryMessage(String.format("INSERT INTO 
%s.%s (pk, ck, v) VALUES (1, %s, 1)",
+                                                                  KEYSPACE,
+                                                                  
currentTable(),
+                                                                  i), 
QueryOptions.DEFAULT));
             }
             ColumnFamilyStore store = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable());
             Util.flush(store);
 
             for (int i = 0; i < iterations; i++)
             {
-                QueryMessage query = new QueryMessage(String.format("DELETE v 
FROM %s.%s WHERE pk = 1 AND ck = %s",
-                                                                    KEYSPACE,
-                                                                    
currentTable(),
-                                                                    i), 
QueryOptions.DEFAULT);
-                client.execute(query);
+                executeWithRetries(client,
+                                   new QueryMessage(String.format("DELETE v 
FROM %s.%s WHERE pk = 1 AND ck = %s",
+                                                                  KEYSPACE,
+                                                                  
currentTable(),
+                                                                  i), 
QueryOptions.DEFAULT));
             }
             Util.flush(store);
 
             {
-                QueryMessage query = new QueryMessage(String.format("SELECT * 
FROM %s.%s WHERE pk = 1",
-                                                                    KEYSPACE,
-                                                                    
currentTable()), QueryOptions.DEFAULT);
-                Message.Response resp = client.execute(query);
+                Message.Response resp = executeWithRetries(client,
+                                                           new 
QueryMessage(String.format("SELECT * FROM %s.%s WHERE pk = 1",
+                                                                               
           KEYSPACE,
+                                                                               
           currentTable()), QueryOptions.DEFAULT));
                 assertEquals(1, resp.getWarnings().size());
             }
         }
     }
 
+    private static Message.Response executeWithRetries(SimpleClient client, 
QueryMessage query)
+    {
+        for (int i = 0; i < 10; i++)
+        {
+            try
+            {
+                return client.execute(query);
+            }
+            catch (RequestTimeoutException t)
+            {
+                logger.warn("Timed out. Retrying.");
+            }
+        }
+        throw new RuntimeException("Could not execute query after 10 tries");
+    }
     @Test
     public void testLargeBatchWithProtoV2() throws Exception
     {
@@ -154,8 +171,8 @@ public class ClientWarningsTest extends CQLTester
         {
             client.connect(false);
 
-            QueryMessage query = new 
QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()),
 QueryOptions.DEFAULT);
-            Message.Response resp = client.execute(query);
+            Message.Response resp = executeWithRetries(client,
+                                                       new 
QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()),
 QueryOptions.DEFAULT));
             assertNull(resp.getWarnings());
         }
     }
diff --git a/test/unit/org/apache/cassandra/tcm/BootWithMetadataTest.java 
b/test/unit/org/apache/cassandra/tcm/BootWithMetadataTest.java
index 831204d7ac..ffb55e3048 100644
--- a/test/unit/org/apache/cassandra/tcm/BootWithMetadataTest.java
+++ b/test/unit/org/apache/cassandra/tcm/BootWithMetadataTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.tcm;
 
-import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Collections;
@@ -99,7 +98,7 @@ public class BootWithMetadataTest
             epoch = doTest(Epoch.create(epoch.getEpoch() + 100), first);
     }
 
-    private Epoch doTest(Epoch epoch, ClusterMetadata first) throws IOException
+    private Epoch doTest(Epoch epoch, ClusterMetadata first) throws Throwable
     {
         long seed = System.nanoTime();
         logger.info("STARTING TEST FROM EPOCH {}, SEED: {}", epoch, seed);
diff --git a/test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java 
b/test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java
index af24888858..f8380a6cb6 100644
--- a/test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java
+++ b/test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java
@@ -60,8 +60,10 @@ public class DiscoverySimulationTest
     public static void setup()
     {
         ClusterMetadata initial = new 
ClusterMetadata(Murmur3Partitioner.instance);
-        LocalLog log = LocalLog.sync(new 
LocalLog.LogSpec().withInitialState(initial));
-        log.ready();
+        LocalLog log = LocalLog.logSpec()
+                               .sync()
+                               .withInitialState(initial)
+                               .createLog();
 
         ClusterMetadataService cms = new ClusterMetadataService(new 
UniformRangePlacement(),
                                                                 
MetadataSnapshots.NO_OP,
@@ -70,6 +72,7 @@ public class DiscoverySimulationTest
                                                                 
Commit.Replicator.NO_OP,
                                                                 false);
         ClusterMetadataService.setInstance(cms);
+        log.readyUnchecked();
     }
 
     @Test
diff --git a/test/unit/org/apache/cassandra/tcm/LogStateTest.java 
b/test/unit/org/apache/cassandra/tcm/LogStateTest.java
index 309847d826..2258362fa7 100644
--- a/test/unit/org/apache/cassandra/tcm/LogStateTest.java
+++ b/test/unit/org/apache/cassandra/tcm/LogStateTest.java
@@ -29,10 +29,10 @@ import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.dht.Murmur3Partitioner;
-import org.apache.cassandra.schema.DistributedSchema;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tcm.extensions.ExtensionValue;
 import org.apache.cassandra.tcm.listeners.MetadataSnapshotListener;
+import org.apache.cassandra.tcm.listeners.SchemaListener;
 import org.apache.cassandra.tcm.log.LocalLog;
 import org.apache.cassandra.tcm.log.LogStorage;
 import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
@@ -54,10 +54,13 @@ public class LogStateTest
         LogStorage logStorage = LogStorage.SystemKeyspace;
         MetadataSnapshots snapshots = new 
MetadataSnapshots.SystemKeyspaceMetadataSnapshots();
         ClusterMetadata initial = new 
ClusterMetadata(DatabaseDescriptor.getPartitioner());
-        LocalLog.LogSpec logSpec = new 
LocalLog.LogSpec().withInitialState(initial)
-                                                         
.withStorage(logStorage)
-                                                         .withLogListener(new 
MetadataSnapshotListener());
-        LocalLog log = LocalLog.sync(logSpec);
+        LocalLog.LogSpec logSpec = LocalLog.logSpec()
+                                           .sync()
+                                           .withInitialState(initial)
+                                           .withStorage(logStorage)
+                                           .withLogListener(new 
MetadataSnapshotListener())
+                                           .withListener(new 
SchemaListener(true));
+        LocalLog log = logSpec.createLog();
         ClusterMetadataService cms = new ClusterMetadataService(new 
UniformRangePlacement(),
                                                                 snapshots,
                                                                 log,
@@ -66,8 +69,7 @@ public class LogStateTest
                                                                 false);
         ClusterMetadataService.unsetInstance();
         ClusterMetadataService.setInstance(cms);
-        initial.schema.initializeKeyspaceInstances(DistributedSchema.empty());
-        log.ready();
+        log.readyUnchecked();
         log.bootstrap(FBUtilities.getBroadcastAddressAndPort());
     }
 
diff --git a/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java 
b/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java
index f3f03c9f04..bcbe663736 100644
--- a/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java
+++ b/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java
@@ -63,8 +63,11 @@ public class LocalLogTest
     @Test
     public void appendToFillGapWithConsecutiveBufferedEntries()
     {
-        LocalLog log = LocalLog.sync(new 
LocalLog.LogSpec().withInitialState(cm()));
-        log.ready();
+        LocalLog log = LocalLog.logSpec()
+                               .sync()
+                               .withInitialState(cm())
+                               .createLog();
+        log.readyUnchecked();
         Epoch start = log.metadata().epoch;
         assertEquals(EMPTY, start);
 
@@ -87,10 +90,13 @@ public class LocalLogTest
     @Test
     public void sealPeriodForceSnapshotCollisionWithGap()
     {
-        LocalLog log = LocalLog.sync(new 
LocalLog.LogSpec().withInitialState(cm()));
-        log.ready();
+        LocalLog log = LocalLog.logSpec()
+                               .sync()
+                               .withInitialState(cm())
+                               .createLog();
+        log.readyUnchecked();
 
-        List<Entry> entries =new ArrayList<>();
+        List<Entry> entries = new ArrayList<>();
         for (int i = 1; i <= 9; i++)
             entries.add(entry(i));
         entries.add(new Entry(Entry.Id.NONE,
@@ -112,8 +118,11 @@ public class LocalLogTest
     @Test
     public void multipleSnapshotEntries()
     {
-        LocalLog log = LocalLog.sync(new 
LocalLog.LogSpec().withInitialState(cm()));
-        log.ready();
+        LocalLog log = LocalLog.logSpec()
+                               .sync()
+                               .withInitialState(cm())
+                               .createLog();
+        log.readyUnchecked();
 
         List<Entry> entries =new ArrayList<>();
         for (int i = 1; i <= 9; i++)
@@ -165,7 +174,7 @@ public class LocalLogTest
         CountDownLatch finish = CountDownLatch.newCountDownLatch(threads);
         CountDownLatch finishReaders = 
CountDownLatch.newCountDownLatch(threads);
         ExecutorPlus executor = executorFactory().configurePooled("APPENDER", 
threads * 2).build();
-        LocalLog log = LocalLog.asyncForTests(cm());
+        LocalLog log = LocalLog.logSpec().withInitialState(cm()).createLog();
 
         List<Entry> committed = new CopyOnWriteArrayList<>(); // doesn't need 
to be concurrent, since log is single-threaded
         log.addListener((e, m) -> committed.add(e));
diff --git 
a/test/unit/org/apache/cassandra/tcm/log/LogListenerNotificationTest.java 
b/test/unit/org/apache/cassandra/tcm/log/LogListenerNotificationTest.java
index d9a14e4e3f..59aafa37e0 100644
--- a/test/unit/org/apache/cassandra/tcm/log/LogListenerNotificationTest.java
+++ b/test/unit/org/apache/cassandra/tcm/log/LogListenerNotificationTest.java
@@ -94,10 +94,13 @@ public class LogListenerNotificationTest
                 counter++;
             }
         };
-        LocalLog.LogSpec logSpec = new 
LocalLog.LogSpec().withInitialState(cm()).withLogListener(listener);
-        LocalLog log = LocalLog.sync(logSpec);
+        LocalLog log = LocalLog.logSpec()
+                               .sync()
+                               .withInitialState(cm())
+                               .withLogListener(listener)
+                               .createLog();
+        log.readyUnchecked();
         log.append(new Entry(Entry.Id.NONE, Epoch.FIRST, 
PreInitialize.forTesting()));
-        log.ready();
         log.append(input);
     }
 
diff --git 
a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java 
b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
index 90d71abaa7..83aafece69 100644
--- 
a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
+++ 
b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
@@ -64,7 +64,6 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.schema.Types;
 import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.JavaDriverUtils;
@@ -620,7 +619,6 @@ public class StressCQLSSTableWriter implements Closeable
          */
         public static ColumnFamilyStore 
createOfflineTable(CreateTableStatement.Raw schemaStatement, 
List<CreateTypeStatement.Raw> typeStatements, List<File> directoryList)
         {
-            ClusterMetadata prev = ClusterMetadata.current();
             String keyspace = schemaStatement.keyspace();
 
             KeyspaceMetadata ksm = KeyspaceMetadata.create(keyspace, 
KeyspaceParams.simple(1));
@@ -644,7 +642,6 @@ public class StressCQLSSTableWriter implements Closeable
             Tables tables = Tables.of(tableMetadata);
             KeyspaceMetadata updated = ksm.withSwapped(tables);
             Schema.instance.submit((metadata) ->  
metadata.schema.getKeyspaces().withAddedOrUpdated(updated));
-            
ClusterMetadata.current().schema.initializeKeyspaceInstances(prev.schema, 
false);
             Keyspace.setInitialized();
             Directories directories = new Directories(tableMetadata, 
directoryList.stream().map(f -> new Directories.DataDirectory(new 
org.apache.cassandra.io.util.File(f.toPath()))).collect(Collectors.toList()));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to