This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit af26ae2d6d67ef986364718c6760b053296b7d70 Author: Sam Tunnicliffe <[email protected]> AuthorDate: Fri Mar 3 19:43:03 2023 +0000 [CEP-21] Modify CassandraDaemon Alter CassandraDaemon intialization to accomodate TCM and replay of the cluster metadata log. This is something of a WIP and there is clearly scope to further clean up this part of the code. Co-authored-by: Marcus Eriksson <[email protected]> Co-authored-by: Alex Petrov <[email protected]> Co-authored-by: Sam Tunnicliffe <[email protected]> --- .../apache/cassandra/service/CassandraDaemon.java | 79 ++++++++++------------ 1 file changed, 36 insertions(+), 43 deletions(-) diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 41443d3d1d..c1adb3e36c 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -70,7 +70,8 @@ import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.sstable.SSTableHeaderFix; import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.Startup; import org.apache.cassandra.metrics.CassandraMetricsRegistry; import org.apache.cassandra.metrics.DefaultNameFactory; import org.apache.cassandra.net.StartupClusterConnectivityChecker; @@ -78,11 +79,10 @@ import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.security.ThreadAwareSecurityManager; -import org.apache.cassandra.streaming.StreamManager; -import org.apache.cassandra.service.paxos.PaxosState; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.InProgressSequence; -import org.apache.cassandra.tcm.Startup; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.service.paxos.PaxosState; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JMXServerUtils; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -95,7 +95,6 @@ import org.apache.cassandra.utils.logging.LoggingSupportFactory; import org.apache.cassandra.utils.logging.VirtualTableAppender; import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_FOREGROUND; import static org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_JMX_REMOTE_PORT; import static org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_PID_FILE; import static org.apache.cassandra.config.CassandraRelevantProperties.COM_SUN_MANAGEMENT_JMXREMOTE_PORT; @@ -112,6 +111,7 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.JAVA_VM_NA public class CassandraDaemon { public static final String MBEAN_NAME = "org.apache.cassandra.db:type=NativeAccess"; + public static boolean SKIP_GC_INSPECTOR = Boolean.getBoolean("cassandra.startup.skip_gc_inspector"); private static final Logger logger; @@ -258,9 +258,7 @@ public class CassandraDaemon NativeLibrary.tryMlockall(); DatabaseDescriptor.createAllDirectories(); - Keyspace.setInitialized(); - CommitLog.instance.start(); try @@ -271,7 +269,6 @@ public class CassandraDaemon { throw new AssertionError("Can't initialize cluster metadata service"); } - QueryProcessor.registerStatementInvalidatingListener(); //TODO disabled b/c this involves checking schema but log replay hasn't run yet so it hasn't been constructed @@ -294,20 +291,9 @@ public class CassandraDaemon SystemKeyspaceMigrator41.migrate(); + // TODO (TM/alexp) // Populate token metadata before flushing, for token-aware sstable partitioning (#6696) -// StorageService.instance.populateTokenMetadata(); - -// try -// { -// // load schema from disk -// Schema.instance.loadFromDisk(); -// } -// catch (Exception e) -// { -// logger.error("Error while loading schema: ", e); -// throw e; -// } - + // StorageService.instance.populateTokenMetadata(); setupVirtualKeyspaces(); SSTableHeaderFix.fixNonFrozenUDTIfUpgradeFrom30(); @@ -321,13 +307,12 @@ public class CassandraDaemon exitOrFail(e.returnCode, e.getMessage(), e.getCause()); } - Keyspace.setInitialized(); - // initialize keyspaces for (String keyspaceName : Schema.instance.getKeyspaces()) { if (logger.isDebugEnabled()) logger.debug("opening keyspace {}", keyspaceName); + // TODO (TM/alexp) // disable auto compaction until gossip settles since disk boundaries may be affected by ring layout for (ColumnFamilyStore cfs : Keyspace.open(keyspaceName).getColumnFamilyStores()) { @@ -348,20 +333,25 @@ public class CassandraDaemon logger.warn("Error loading key or row cache", t); } - try - { - GCInspector.register(); - } - catch (Throwable t) + if (!SKIP_GC_INSPECTOR) { - JVMStabilityInspector.inspectThrowable(t); - logger.warn("Unable to start GCInspector (currently only supported on the Sun JVM)"); + try + { + GCInspector.register(); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + logger.warn("Unable to start GCInspector (currently only supported on the Sun JVM)"); + } } // Replay any CommitLogSegments found on disk PaxosState.initializeTrackers(); // replay the log if necessary + // TODO samt - when restarting a previously running instance, this needs to happen after reconstructing schema + // from the cluster metadata log or all mutations will throw IncompatibleSchemaException on deserialisation try { CommitLog.instance.recoverSegmentsOnDisk(); @@ -371,9 +361,6 @@ public class CassandraDaemon throw new RuntimeException(e); } - // Re-populate token metadata after commit log recover (new peers might be loaded onto system keyspace #10293) -// StorageService.instance.populateTokenMetadata(); - try { PaxosState.maybeRebuildUncommittedState(); @@ -384,7 +371,7 @@ public class CassandraDaemon } // Clean up system.size_estimates entries left lying around from missed keyspace drops (CASSANDRA-14905) - StorageService.instance.cleanupSizeEstimates(); + SystemKeyspace.clearAllEstimates(); // schedule periodic dumps of table size estimates into SystemKeyspace.SIZE_ESTIMATES_CF // set cassandra.size_recorder_interval to 0 to disable @@ -440,6 +427,9 @@ public class CassandraDaemon exitOrFail(1, "Fatal configuration error", e); } + ClusterMetadataService.instance().replayAndWait(); + + // TODO: (TM/alexp), this can be made time-dependent // Because we are writing to the system_distributed keyspace, this should happen after that is created, which // happens in StorageService.instance.initServer() Runnable viewRebuild = () -> { @@ -452,8 +442,9 @@ public class CassandraDaemon ScheduledExecutors.optionalTasks.schedule(viewRebuild, StorageService.RING_DELAY_MILLIS, TimeUnit.MILLISECONDS); - if (!FBUtilities.getBroadcastAddressAndPort().equals(InetAddressAndPort.getLoopbackAddress())) - Gossiper.waitToSettle(); + // TODO: (TM/alexp), we do not need to wait for gossip settlement anymore +// if (!FBUtilities.getBroadcastAddressAndPort().equals(InetAddressAndPort.getLoopbackAddress())) +// Gossiper.waitToSettle(); StorageService.instance.doAuthSetup(false); @@ -464,7 +455,7 @@ public class CassandraDaemon { for (final ColumnFamilyStore store : cfs.concatWithIndexes()) { - store.reload(); //reload CFs in case there was a change of disk boundaries + store.reload(store.metadata()); //reload CFs in case there was a change of disk boundaries if (store.getCompactionStrategyManager().shouldBeEnabled()) { if (DatabaseDescriptor.getAutocompactionOnStartupEnabled()) @@ -722,6 +713,7 @@ public class CassandraDaemon if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || (nativeFlag == null && DatabaseDescriptor.startNativeTransport())) { startNativeTransport(); + // TODO: we should represent this state in transactional metadata to avoid relying on gossip StorageService.instance.setRpcReady(true); } else @@ -739,7 +731,7 @@ public class CassandraDaemon // jsvc takes care of taking the rest down logger.info("Cassandra shutting down..."); destroyClientTransports(); - StorageService.instance.setRpcReady(false); + //StorageService.instance.setRpcReady(false); if (jmxServer != null) { @@ -790,11 +782,12 @@ public class CassandraDaemon new File(pidFile).deleteOnExit(); } - if (CASSANDRA_FOREGROUND.getString() == null) - { - System.out.close(); - System.err.close(); - } + // TODO: this should definitely be done differently +// if (CASSANDRA_FOREGROUND.getString() == null) +// { +// System.out.close(); +// System.err.close(); +// } start(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
