This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit f9f5faa1a7714bc3b383e65bc2a32da55a4e7835 Author: Sam Tunnicliffe <[email protected]> AuthorDate: Fri Mar 3 10:03:41 2023 +0000 [CEP-21] Restore operation mode reporting (7/7) Part 7 of 7 brings StorageService.operationMode back into sync with previous behaviour. Many external coordination tools depend on accessing this state via JMX, so this is an important external interface. This commit also adds a virtual version of the system.local table, as we can fully construct the data for this from ClusterMetadata, meaning we no longer the on-disk system table, though this is retained for now. In future, more system tables can be virtualised (system.peers, system_schema, etc). Co-authored-by: Marcus Eriksson <[email protected]> Co-authored-by: Alex Petrov <[email protected]> Co-authored-by: Sam Tunnicliffe <[email protected]> --- .../org/apache/cassandra/db/SystemKeyspace.java | 32 ++++- .../apache/cassandra/db/virtual/LocalTable.java | 145 +++++++++++++++++++++ .../cassandra/db/virtual/SystemViewsKeyspace.java | 1 + 3 files changed, 173 insertions(+), 5 deletions(-) diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index cc4eafdf39..2a10af1dc5 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -98,7 +98,6 @@ import org.apache.cassandra.schema.Tables; import org.apache.cassandra.schema.Types; import org.apache.cassandra.schema.UserFunctions; import org.apache.cassandra.schema.Views; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.paxos.Ballot; import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.service.paxos.Commit.Accepted; @@ -109,8 +108,10 @@ import org.apache.cassandra.service.paxos.PaxosState; import org.apache.cassandra.service.paxos.uncommitted.PaxosRows; import org.apache.cassandra.service.paxos.uncommitted.PaxosUncommittedIndex; import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.Sealed; +import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.CassandraVersion; @@ -599,7 +600,27 @@ public final class SystemKeyspace NEEDS_BOOTSTRAP, COMPLETED, IN_PROGRESS, - DECOMMISSIONED + DECOMMISSIONED; + + public static BootstrapState fromNodeState(NodeState nodeState) + { + if (nodeState == null) // todo, handle this properly + return DECOMMISSIONED; + switch (nodeState) + { + case REGISTERED: + return NEEDS_BOOTSTRAP; + case BOOTSTRAPPING: + return IN_PROGRESS; + case JOINED: + case LEAVING: + case MOVING: + return COMPLETED; + case LEFT: + default: + return DECOMMISSIONED; + } + } } public static void persistLocalMetadata() @@ -913,11 +934,11 @@ public final class SystemKeyspace executeInternal(format(req, LOCAL, LOCAL), version); } - private static Set<String> tokensAsSet(Collection<Token> tokens) + public static Set<String> tokensAsSet(Collection<Token> tokens) { if (tokens.isEmpty()) return Collections.emptySet(); - Token.TokenFactory factory = StorageService.instance.getTokenFactory(); + Token.TokenFactory factory = ClusterMetadata.current().partitioner.getTokenFactory(); Set<String> s = new HashSet<>(tokens.size()); for (Token tk : tokens) s.add(factory.toString(tk)); @@ -926,7 +947,7 @@ public final class SystemKeyspace private static Collection<Token> deserializeTokens(Collection<String> tokensStrings) { - Token.TokenFactory factory = StorageService.instance.getTokenFactory(); + Token.TokenFactory factory = ClusterMetadata.current().partitioner.getTokenFactory(); List<Token> tokens = new ArrayList<>(tokensStrings.size()); for (String tk : tokensStrings) tokens.add(factory.fromString(tk)); @@ -1262,6 +1283,7 @@ public final class SystemKeyspace * Read the host ID from the system keyspace, creating (and storing) one if * none exists. */ + // TODO: this method should not exist. Only CMS can give out host ids. public static synchronized UUID getOrInitializeLocalHostId() { return getOrInitializeLocalHostId(UUID::randomUUID); diff --git a/src/java/org/apache/cassandra/db/virtual/LocalTable.java b/src/java/org/apache/cassandra/db/virtual/LocalTable.java new file mode 100644 index 0000000000..2c0cc11e10 --- /dev/null +++ b/src/java/org/apache/cassandra/db/virtual/LocalTable.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.virtual; + +import java.util.HashSet; +import java.util.stream.Collectors; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.marshal.InetAddressType; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.SetType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.dht.LocalPartitioner; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeAddresses; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeState; +import org.apache.cassandra.transport.ProtocolVersion; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; + +public class LocalTable extends AbstractVirtualTable +{ + public static final String BOOTSTRAPPED = "bootstrapped"; + public static final String BROADCAST_ADDRESS = "broadcast_address"; + public static final String BROADCAST_PORT = "broadcast_port"; + public static final String CLUSTER_NAME = "cluster_name"; + public static final String LISTEN_ADDRESS = "listen_address"; + public static final String LISTEN_PORT = "listen_port"; + public static final String CQL_VERSION = "cql_version"; + public static final String DATACENTER = "datacenter"; + public static final String RACK = "rack"; + public static final String HOST_ID = "host_id"; + public static final String GOSSIP_GENERATION = "gossip_generation"; + public static final String RELEASE_VERSION = "release_version"; + public static final String NATIVE_ADDRESS = "native_address"; + public static final String NATIVE_PORT = "native_port"; + public static final String NATIVE_PROTOCOL_VERSION = "native_protocol_version"; + public static final String PARTITIONER = "partitioner"; + public static final String SCHEMA_VERSION = "schema_version"; + public static final String TOKENS = "tokens"; + public static final String STATE = "state"; + public static final String STATUS = "status"; + public static final String KEY = "local"; + public static final String TRUNCATED_AT = "truncated_at"; + + public LocalTable(String keyspace) + { + super(TableMetadata.builder(keyspace, "local") + .comment("Information about local node") + .kind(TableMetadata.Kind.VIRTUAL) + .partitioner(new LocalPartitioner(InetAddressType.instance)) + .addPartitionKeyColumn(KEY, UTF8Type.instance) + .addRegularColumn(BOOTSTRAPPED, UTF8Type.instance) + .addRegularColumn(BROADCAST_ADDRESS, InetAddressType.instance) + .addRegularColumn(BROADCAST_PORT, Int32Type.instance) + .addRegularColumn(CLUSTER_NAME, UTF8Type.instance) + .addRegularColumn(CQL_VERSION, UTF8Type.instance) + .addRegularColumn(DATACENTER, UTF8Type.instance) + .addRegularColumn(GOSSIP_GENERATION, Int32Type.instance) + .addRegularColumn(HOST_ID, UUIDType.instance) + .addRegularColumn(LISTEN_ADDRESS, InetAddressType.instance) + .addRegularColumn(LISTEN_PORT, Int32Type.instance) + .addRegularColumn(NATIVE_ADDRESS, InetAddressType.instance) + .addRegularColumn(NATIVE_PORT, Int32Type.instance) + .addRegularColumn(NATIVE_PROTOCOL_VERSION, UTF8Type.instance) + .addRegularColumn(PARTITIONER, UTF8Type.instance) + .addRegularColumn(RACK, UTF8Type.instance) + .addRegularColumn(RELEASE_VERSION, UTF8Type.instance) + .addRegularColumn(SCHEMA_VERSION, UUIDType.instance) + .addRegularColumn(STATE, UTF8Type.instance) + .addRegularColumn(STATUS, UTF8Type.instance) + .addRegularColumn(TOKENS, SetType.getInstance(UTF8Type.instance, false)) +// .addRegularColumn(TRUNCATED_AT, MapType.getInstance(UUIDType.instance, UTF8Type.instance, false)) todo? + .build()); + } + + public DataSet data() + { + SimpleDataSet result = new SimpleDataSet(metadata()); + + ClusterMetadata cm = ClusterMetadata.current(); + NodeId peer = cm.myNodeId(); + NodeState nodeState = cm.directory.peerState(peer); + NodeAddresses addresses = cm.directory.getNodeAddresses(peer); + Location location = cm.directory.location(peer); + result.row(KEY) + .column(BOOTSTRAPPED, SystemKeyspace.BootstrapState.fromNodeState(nodeState).toString()) + .column(BROADCAST_ADDRESS, addresses.broadcastAddress.getAddress()) + .column(BROADCAST_PORT, addresses.broadcastAddress.getPort()) + .column(CLUSTER_NAME, DatabaseDescriptor.getClusterName()) + .column(CQL_VERSION, QueryProcessor.CQL_VERSION.toString()) + .column(DATACENTER, location.datacenter) + .column(GOSSIP_GENERATION, Gossiper.instance.getCurrentGenerationNumber(FBUtilities.getBroadcastAddressAndPort())) + .column(HOST_ID, peer.uuid) + .column(LISTEN_ADDRESS, addresses.localAddress.getAddress()) + .column(LISTEN_PORT, addresses.localAddress.getPort()) + .column(NATIVE_ADDRESS, addresses.nativeAddress.getAddress()) + .column(NATIVE_PORT, addresses.nativeAddress.getPort()) + .column(NATIVE_PROTOCOL_VERSION, String.valueOf(ProtocolVersion.CURRENT.asInt())) + .column(PARTITIONER, cm.partitioner.getClass().getName()) + .column(RACK, location.rack) + .column(RELEASE_VERSION, cm.directory.version(peer).cassandraVersion.toString()) + .column(SCHEMA_VERSION, cm.schema.getVersion()) + .column(STATE, cm.directory.peerState(peer).toString()) + .column(STATUS, status(cm)) + .column(TOKENS, new HashSet<>(cm.tokenMap.tokens(peer).stream().map((token) -> token.getToken().getTokenValue().toString()).collect(Collectors.toList()))); + //.column(TRUNCATED_AT, status(cm)); // todo? + + return result; + } + + private static String status(ClusterMetadata cm) + { + if (StorageService.instance.isDraining()) + return StorageService.Mode.DRAINING.toString(); + if (StorageService.instance.isDrained()) + return StorageService.Mode.DRAINED.toString(); + return cm.directory.peerState(getBroadcastAddressAndPort()).toString(); + } +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java index 12df6c7790..7ebed45ac2 100644 --- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java +++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java @@ -50,6 +50,7 @@ public final class SystemViewsKeyspace extends VirtualKeyspace .add(new QueriesTable(VIRTUAL_VIEWS)) .add(new LogMessagesTable(VIRTUAL_VIEWS)) .add(new SnapshotsTable(VIRTUAL_VIEWS)) + .add(new LocalTable(VIRTUAL_VIEWS)) .addAll(LocalRepairTables.getAll(VIRTUAL_VIEWS)) .build()); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
