This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 633b40766b046cd852f312e51c4b6dbc60e49add
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Thu Mar 30 16:52:13 2023 +0100

    [CEP-21] During startup, don't open SSTables until local metadata log 
replay is complete
    
    patch by Sam Tunnicliffe; reviewed by Marcus Eriksson and Alex Petrov
    for CASSANDRA-18458
---
 .../org/apache/cassandra/db/ColumnFamilyStore.java | 18 ++++-
 .../apache/cassandra/schema/DistributedSchema.java |  6 +-
 src/java/org/apache/cassandra/tcm/Startup.java     | 79 ++++++++++++----------
 .../cassandra/tcm/listeners/SchemaListener.java    | 21 +++++-
 .../org/apache/cassandra/tcm/log/LocalLog.java     | 13 ++--
 5 files changed, 91 insertions(+), 46 deletions(-)

diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 99e0bcde68..da85799a7a 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -506,6 +506,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
             initialMemtable = createMemtable(new 
AtomicReference<>(CommitLog.instance.getCurrentPosition()));
             memtableMetrics = memtableFactory.createMemtableMetrics(metadata);
         }
+        metric = new TableMetrics(this, memtableMetrics);
         data = new Tracker(this, initialMemtable, loadSSTables);
 
         // Note that this needs to happen before we load the first sstables, 
or the global sstable tracker will not
@@ -537,8 +538,6 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
             indexManager.addIndex(info, true);
         }
 
-        metric = new TableMetrics(this, memtableMetrics);
-
         if (data.loadsstables)
         {
             data.updateInitialSSTableSize(sstables);
@@ -570,6 +569,21 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
             topPartitions = new TopPartitionTracker(initMetadata);
     }
 
+    public void loadInitialSSTables()
+    {
+        if (data.loadsstables)
+        {
+            logger.info("Attempted to load initial SSTables for {}.{} but this 
was done during construction, ignoring",
+                        keyspace.getName(), name);
+            return;
+        }
+        Collection<SSTableReader> sstables;
+        Directories.SSTableLister sstableFiles = 
directories.sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true);
+        sstables = SSTableReader.openAll(this, sstableFiles.list().entrySet(), 
metadata);
+        data.addInitialSSTablesWithoutUpdatingSize(sstables, this);
+        data.updateInitialSSTableSize(sstables);
+    }
+
     public static String getTableMBeanName(String ks, String name, boolean 
isIndex)
     {
         return 
String.format("org.apache.cassandra.db:type=%s,keyspace=%s,table=%s",
diff --git a/src/java/org/apache/cassandra/schema/DistributedSchema.java 
b/src/java/org/apache/cassandra/schema/DistributedSchema.java
index f2e2356f9d..3ce318dd59 100644
--- a/src/java/org/apache/cassandra/schema/DistributedSchema.java
+++ b/src/java/org/apache/cassandra/schema/DistributedSchema.java
@@ -146,7 +146,7 @@ public class DistributedSchema implements 
MetadataValue<DistributedSchema>
                 delta.tables.dropped.forEach(t -> dropTable(keyspace, t, 
true));
 
                 // add tables and views
-                delta.tables.created.forEach(t -> createTable(keyspace, t));
+                delta.tables.created.forEach(t -> createTable(keyspace, t, 
loadSSTables));
                 delta.views.created.forEach(v -> createView(keyspace, v));
 
                 // update tables and views
@@ -238,10 +238,10 @@ public class DistributedSchema implements 
MetadataValue<DistributedSchema>
         SchemaDiagnostics.tableDropped(Schema.instance, metadata);
     }
 
-    private void createTable(Keyspace keyspace, TableMetadata table)
+    private void createTable(Keyspace keyspace, TableMetadata table, boolean 
loadSSTables)
     {
         SchemaDiagnostics.tableCreating(Schema.instance, table);
-        keyspace.initCf(table, true);
+        keyspace.initCf(table, loadSSTables);
         SchemaDiagnostics.tableCreated(Schema.instance, table);
     }
 
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java 
b/src/java/org/apache/cassandra/tcm/Startup.java
index a49a3d088e..5600dd12d3 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -18,37 +18,40 @@
 
 package org.apache.cassandra.tcm;
 
- import java.io.IOException;
- import java.util.Collections;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.TimeUnit;
- import java.util.function.Function;
-
- import com.google.common.util.concurrent.Uninterruptibles;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.db.SystemKeyspace;
- import org.apache.cassandra.db.commitlog.CommitLog;
- import org.apache.cassandra.gms.EndpointState;
- import org.apache.cassandra.gms.Gossiper;
- import org.apache.cassandra.gms.NewGossiper;
- import org.apache.cassandra.locator.InetAddressAndPort;
- import org.apache.cassandra.net.MessagingService;
- import org.apache.cassandra.schema.DistributedSchema;
- import org.apache.cassandra.tcm.compatibility.GossipHelper;
- import org.apache.cassandra.tcm.log.SystemKeyspaceStorage;
- import org.apache.cassandra.tcm.migration.Election;
- import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
- import org.apache.cassandra.tcm.transformations.cms.Initialize;
- import org.apache.cassandra.utils.FBUtilities;
-
- import static org.apache.cassandra.tcm.ClusterMetadataService.State.LOCAL;
- import static 
org.apache.cassandra.tcm.compatibility.GossipHelper.emptyWithSchemaFromSystemTables;
- import static 
org.apache.cassandra.tcm.compatibility.GossipHelper.fromEndpointStates;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.NewGossiper;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.DistributedSchema;
+import org.apache.cassandra.tcm.compatibility.GossipHelper;
+import org.apache.cassandra.tcm.listeners.SchemaListener;
+import org.apache.cassandra.tcm.log.SystemKeyspaceStorage;
+import org.apache.cassandra.tcm.migration.Election;
+import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
+import org.apache.cassandra.tcm.transformations.cms.Initialize;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.tcm.ClusterMetadataService.State.LOCAL;
+import static 
org.apache.cassandra.tcm.compatibility.GossipHelper.emptyWithSchemaFromSystemTables;
+import static 
org.apache.cassandra.tcm.compatibility.GossipHelper.fromEndpointStates;
 
  public class Startup
  {
@@ -103,7 +106,6 @@ package org.apache.cassandra.tcm;
      {
          
ClusterMetadataService.instance().log().bootstrap(FBUtilities.getBroadcastAddressAndPort());
          assert ClusterMetadataService.state() == LOCAL : String.format("Can't 
initialize as node hasn't transitioned to CMS state. State: %s.\n%s", 
ClusterMetadataService.state(), ClusterMetadata.current());
-
          Initialize initialize = new Initialize(ClusterMetadata.current());
          ClusterMetadataService.instance().commit(initialize);
      }
@@ -116,23 +118,28 @@ package org.apache.cassandra.tcm;
                                                                        initial,
                                                                        
wrapProcessor,
                                                                        
ClusterMetadataService::state));
-
          ClusterMetadataService.instance().initRecentlySealedPeriodsIndex();
          ClusterMetadataService.instance().log().replayPersisted();
+         
ClusterMetadataService.instance().log().removeListener(SchemaListener.INSTANCE_FOR_STARTUP);
+         DistributedSchema schema = ClusterMetadata.current().schema;
+         schema.getKeyspaces().forEach(ksm -> {
+             Keyspace ks = schema.getKeyspace(ksm.name);
+             ks.getColumnFamilyStores().forEach(cfs -> {
+                 
cfs.concatWithIndexes().forEach(ColumnFamilyStore::loadInitialSSTables);
+             });
+         });
+         ClusterMetadataService.instance().log().addListener(new 
SchemaListener());
      }
 
      public static void initializeForDiscovery(Runnable initMessaging)
      {
          initMessaging.run();
-
          logger.debug("Discovering other nodes in the system");
          Discovery.DiscoveredNodes candidates = Discovery.instance.discover();
-
          if (candidates.kind() == Discovery.DiscoveredNodes.Kind.KNOWN_PEERS)
          {
              logger.debug("Got candidates: " + candidates);
              InetAddressAndPort min = 
candidates.nodes().stream().min(InetAddressAndPort::compareTo).get();
-
              // identify if you need to start the vote
              if (min.equals(FBUtilities.getBroadcastAddressAndPort()) || 
FBUtilities.getBroadcastAddressAndPort().compareTo(min) < 0)
              {
diff --git a/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java 
b/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java
index d3320b1568..38c5ef572c 100644
--- a/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java
+++ b/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java
@@ -29,11 +29,30 @@ import org.apache.cassandra.tcm.ClusterMetadata;
 
 public class SchemaListener implements ChangeListener
 {
+    // Special instance used during startup to ensure that SSTable files are 
not opened until
+    // replay of the locally persisted metadata log is complete. Failure to do 
this can result
+    // in deserialization errors if an SSTables written with schema at epoch X 
are opened before
+    // the log replay has replayed X. After replay is complete, this instance 
is replaced with
+    // a standard SchemaListener.
+    public static final SchemaListener INSTANCE_FOR_STARTUP = new 
SchemaListener()
+    {
+        @Override
+        public void notifyPreCommit(ClusterMetadata prev, ClusterMetadata next)
+        {
+            notifyInternal(prev, next, false);
+        }
+    };
+
     @Override
     public void notifyPreCommit(ClusterMetadata prev, ClusterMetadata next)
+    {
+        notifyInternal(prev, next, true);
+    }
+
+    protected void notifyInternal(ClusterMetadata prev, ClusterMetadata next, 
boolean loadSSTables)
     {
         if (!next.schema.lastModified().equals(prev.schema.lastModified()))
-            next.schema.initializeKeyspaceInstances(prev.schema);
+            next.schema.initializeKeyspaceInstances(prev.schema, loadSSTables);
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java 
b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
index 25942368c8..931e581c63 100644
--- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java
+++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
@@ -272,10 +272,10 @@ public abstract class LocalLog implements Closeable
                     assert transformed.isSuccess();
                     ClusterMetadata next = transformed.success().metadata;
                     assert pendingEntry.epoch.is(next.epoch) :
-                           String.format("Entry epoch %s does not match 
metadata epoch %s", pendingEntry.epoch, next.epoch);
+                    String.format("Entry epoch %s does not match metadata 
epoch %s", pendingEntry.epoch, next.epoch);
                     assert next.epoch.isDirectlyAfter(prev.epoch) || 
pendingEntry.transform.kind() == Transformation.Kind.FORCE_SNAPSHOT || 
pendingEntry.transform.kind() == Transformation.Kind.PRE_INITIALIZE_CMS :
-                           String.format("Epoch %s for %s can either force 
snapshot, or immediately follow %s",
-                                         next.epoch, pendingEntry.transform, 
prev.epoch);
+                    String.format("Epoch %s for %s can either force snapshot, 
or immediately follow %s",
+                                  next.epoch, pendingEntry.transform, 
prev.epoch);
 
                     persistence.append(transformed.success().metadata.period, 
pendingEntry.maybeUnwrapExecuted());
 
@@ -353,6 +353,11 @@ public abstract class LocalLog implements Closeable
         this.cmListeners.add(listener);
     }
 
+    public void removeListener(ChangeListener listener)
+    {
+        this.cmListeners.remove(listener);
+    }
+
     private static class Async extends LocalLog
     {
         private final AsyncRunnable runnable;
@@ -562,7 +567,7 @@ public abstract class LocalLog implements Closeable
     {
         addListener(snapshotListener());
         addListener(new InitializationListener());
-        addListener(new SchemaListener());
+        addListener(SchemaListener.INSTANCE_FOR_STARTUP);
         addListener(new LegacyStateListener());
         addListener(new PlacementsChangeListener());
         addListener(new MetadataSnapshotListener());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to