This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 633b40766b046cd852f312e51c4b6dbc60e49add Author: Sam Tunnicliffe <[email protected]> AuthorDate: Thu Mar 30 16:52:13 2023 +0100 [CEP-21] During startup, don't open SSTables until local metadata log replay is complete patch by Sam Tunnicliffe; reviewed by Marcus Eriksson and Alex Petrov for CASSANDRA-18458 --- .../org/apache/cassandra/db/ColumnFamilyStore.java | 18 ++++- .../apache/cassandra/schema/DistributedSchema.java | 6 +- src/java/org/apache/cassandra/tcm/Startup.java | 79 ++++++++++++---------- .../cassandra/tcm/listeners/SchemaListener.java | 21 +++++- .../org/apache/cassandra/tcm/log/LocalLog.java | 13 ++-- 5 files changed, 91 insertions(+), 46 deletions(-) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 99e0bcde68..da85799a7a 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -506,6 +506,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner initialMemtable = createMemtable(new AtomicReference<>(CommitLog.instance.getCurrentPosition())); memtableMetrics = memtableFactory.createMemtableMetrics(metadata); } + metric = new TableMetrics(this, memtableMetrics); data = new Tracker(this, initialMemtable, loadSSTables); // Note that this needs to happen before we load the first sstables, or the global sstable tracker will not @@ -537,8 +538,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner indexManager.addIndex(info, true); } - metric = new TableMetrics(this, memtableMetrics); - if (data.loadsstables) { data.updateInitialSSTableSize(sstables); @@ -570,6 +569,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner topPartitions = new TopPartitionTracker(initMetadata); } + public void loadInitialSSTables() + { + if (data.loadsstables) + { + logger.info("Attempted to load initial SSTables for {}.{} but this was done during construction, ignoring", + keyspace.getName(), name); + return; + } + Collection<SSTableReader> sstables; + Directories.SSTableLister sstableFiles = directories.sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true); + sstables = SSTableReader.openAll(this, sstableFiles.list().entrySet(), metadata); + data.addInitialSSTablesWithoutUpdatingSize(sstables, this); + data.updateInitialSSTableSize(sstables); + } + public static String getTableMBeanName(String ks, String name, boolean isIndex) { return String.format("org.apache.cassandra.db:type=%s,keyspace=%s,table=%s", diff --git a/src/java/org/apache/cassandra/schema/DistributedSchema.java b/src/java/org/apache/cassandra/schema/DistributedSchema.java index f2e2356f9d..3ce318dd59 100644 --- a/src/java/org/apache/cassandra/schema/DistributedSchema.java +++ b/src/java/org/apache/cassandra/schema/DistributedSchema.java @@ -146,7 +146,7 @@ public class DistributedSchema implements MetadataValue<DistributedSchema> delta.tables.dropped.forEach(t -> dropTable(keyspace, t, true)); // add tables and views - delta.tables.created.forEach(t -> createTable(keyspace, t)); + delta.tables.created.forEach(t -> createTable(keyspace, t, loadSSTables)); delta.views.created.forEach(v -> createView(keyspace, v)); // update tables and views @@ -238,10 +238,10 @@ public class DistributedSchema implements MetadataValue<DistributedSchema> SchemaDiagnostics.tableDropped(Schema.instance, metadata); } - private void createTable(Keyspace keyspace, TableMetadata table) + private void createTable(Keyspace keyspace, TableMetadata table, boolean loadSSTables) { SchemaDiagnostics.tableCreating(Schema.instance, table); - keyspace.initCf(table, true); + keyspace.initCf(table, loadSSTables); SchemaDiagnostics.tableCreated(Schema.instance, table); } diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index a49a3d088e..5600dd12d3 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -18,37 +18,40 @@ package org.apache.cassandra.tcm; - import java.io.IOException; - import java.util.Collections; - import java.util.Map; - import java.util.Set; - import java.util.concurrent.ExecutionException; - import java.util.concurrent.TimeUnit; - import java.util.function.Function; - - import com.google.common.util.concurrent.Uninterruptibles; - import org.slf4j.Logger; - import org.slf4j.LoggerFactory; - - import org.apache.cassandra.config.DatabaseDescriptor; - import org.apache.cassandra.db.SystemKeyspace; - import org.apache.cassandra.db.commitlog.CommitLog; - import org.apache.cassandra.gms.EndpointState; - import org.apache.cassandra.gms.Gossiper; - import org.apache.cassandra.gms.NewGossiper; - import org.apache.cassandra.locator.InetAddressAndPort; - import org.apache.cassandra.net.MessagingService; - import org.apache.cassandra.schema.DistributedSchema; - import org.apache.cassandra.tcm.compatibility.GossipHelper; - import org.apache.cassandra.tcm.log.SystemKeyspaceStorage; - import org.apache.cassandra.tcm.migration.Election; - import org.apache.cassandra.tcm.ownership.UniformRangePlacement; - import org.apache.cassandra.tcm.transformations.cms.Initialize; - import org.apache.cassandra.utils.FBUtilities; - - import static org.apache.cassandra.tcm.ClusterMetadataService.State.LOCAL; - import static org.apache.cassandra.tcm.compatibility.GossipHelper.emptyWithSchemaFromSystemTables; - import static org.apache.cassandra.tcm.compatibility.GossipHelper.fromEndpointStates; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.NewGossiper; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.DistributedSchema; +import org.apache.cassandra.tcm.compatibility.GossipHelper; +import org.apache.cassandra.tcm.listeners.SchemaListener; +import org.apache.cassandra.tcm.log.SystemKeyspaceStorage; +import org.apache.cassandra.tcm.migration.Election; +import org.apache.cassandra.tcm.ownership.UniformRangePlacement; +import org.apache.cassandra.tcm.transformations.cms.Initialize; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.tcm.ClusterMetadataService.State.LOCAL; +import static org.apache.cassandra.tcm.compatibility.GossipHelper.emptyWithSchemaFromSystemTables; +import static org.apache.cassandra.tcm.compatibility.GossipHelper.fromEndpointStates; public class Startup { @@ -103,7 +106,6 @@ package org.apache.cassandra.tcm; { ClusterMetadataService.instance().log().bootstrap(FBUtilities.getBroadcastAddressAndPort()); assert ClusterMetadataService.state() == LOCAL : String.format("Can't initialize as node hasn't transitioned to CMS state. State: %s.\n%s", ClusterMetadataService.state(), ClusterMetadata.current()); - Initialize initialize = new Initialize(ClusterMetadata.current()); ClusterMetadataService.instance().commit(initialize); } @@ -116,23 +118,28 @@ package org.apache.cassandra.tcm; initial, wrapProcessor, ClusterMetadataService::state)); - ClusterMetadataService.instance().initRecentlySealedPeriodsIndex(); ClusterMetadataService.instance().log().replayPersisted(); + ClusterMetadataService.instance().log().removeListener(SchemaListener.INSTANCE_FOR_STARTUP); + DistributedSchema schema = ClusterMetadata.current().schema; + schema.getKeyspaces().forEach(ksm -> { + Keyspace ks = schema.getKeyspace(ksm.name); + ks.getColumnFamilyStores().forEach(cfs -> { + cfs.concatWithIndexes().forEach(ColumnFamilyStore::loadInitialSSTables); + }); + }); + ClusterMetadataService.instance().log().addListener(new SchemaListener()); } public static void initializeForDiscovery(Runnable initMessaging) { initMessaging.run(); - logger.debug("Discovering other nodes in the system"); Discovery.DiscoveredNodes candidates = Discovery.instance.discover(); - if (candidates.kind() == Discovery.DiscoveredNodes.Kind.KNOWN_PEERS) { logger.debug("Got candidates: " + candidates); InetAddressAndPort min = candidates.nodes().stream().min(InetAddressAndPort::compareTo).get(); - // identify if you need to start the vote if (min.equals(FBUtilities.getBroadcastAddressAndPort()) || FBUtilities.getBroadcastAddressAndPort().compareTo(min) < 0) { diff --git a/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java b/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java index d3320b1568..38c5ef572c 100644 --- a/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java +++ b/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java @@ -29,11 +29,30 @@ import org.apache.cassandra.tcm.ClusterMetadata; public class SchemaListener implements ChangeListener { + // Special instance used during startup to ensure that SSTable files are not opened until + // replay of the locally persisted metadata log is complete. Failure to do this can result + // in deserialization errors if an SSTables written with schema at epoch X are opened before + // the log replay has replayed X. After replay is complete, this instance is replaced with + // a standard SchemaListener. + public static final SchemaListener INSTANCE_FOR_STARTUP = new SchemaListener() + { + @Override + public void notifyPreCommit(ClusterMetadata prev, ClusterMetadata next) + { + notifyInternal(prev, next, false); + } + }; + @Override public void notifyPreCommit(ClusterMetadata prev, ClusterMetadata next) + { + notifyInternal(prev, next, true); + } + + protected void notifyInternal(ClusterMetadata prev, ClusterMetadata next, boolean loadSSTables) { if (!next.schema.lastModified().equals(prev.schema.lastModified())) - next.schema.initializeKeyspaceInstances(prev.schema); + next.schema.initializeKeyspaceInstances(prev.schema, loadSSTables); } @Override diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java b/src/java/org/apache/cassandra/tcm/log/LocalLog.java index 25942368c8..931e581c63 100644 --- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java +++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java @@ -272,10 +272,10 @@ public abstract class LocalLog implements Closeable assert transformed.isSuccess(); ClusterMetadata next = transformed.success().metadata; assert pendingEntry.epoch.is(next.epoch) : - String.format("Entry epoch %s does not match metadata epoch %s", pendingEntry.epoch, next.epoch); + String.format("Entry epoch %s does not match metadata epoch %s", pendingEntry.epoch, next.epoch); assert next.epoch.isDirectlyAfter(prev.epoch) || pendingEntry.transform.kind() == Transformation.Kind.FORCE_SNAPSHOT || pendingEntry.transform.kind() == Transformation.Kind.PRE_INITIALIZE_CMS : - String.format("Epoch %s for %s can either force snapshot, or immediately follow %s", - next.epoch, pendingEntry.transform, prev.epoch); + String.format("Epoch %s for %s can either force snapshot, or immediately follow %s", + next.epoch, pendingEntry.transform, prev.epoch); persistence.append(transformed.success().metadata.period, pendingEntry.maybeUnwrapExecuted()); @@ -353,6 +353,11 @@ public abstract class LocalLog implements Closeable this.cmListeners.add(listener); } + public void removeListener(ChangeListener listener) + { + this.cmListeners.remove(listener); + } + private static class Async extends LocalLog { private final AsyncRunnable runnable; @@ -562,7 +567,7 @@ public abstract class LocalLog implements Closeable { addListener(snapshotListener()); addListener(new InitializationListener()); - addListener(new SchemaListener()); + addListener(SchemaListener.INSTANCE_FOR_STARTUP); addListener(new LegacyStateListener()); addListener(new PlacementsChangeListener()); addListener(new MetadataSnapshotListener()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
