This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit f9f5faa1a7714bc3b383e65bc2a32da55a4e7835
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Fri Mar 3 10:03:41 2023 +0000

    [CEP-21] Restore operation mode reporting (7/7)
    
    Part 7 of 7 brings StorageService.operationMode back into sync with
    previous behaviour. Many external coordination tools depend on accessing
    this state via JMX, so this is an important external interface.
    
    This commit also adds a virtual version of the system.local table, as we
    can fully construct the data for this from ClusterMetadata, meaning we no
    longer the on-disk system table, though this is retained for now. In
    future, more system tables can be virtualised (system.peers,
    system_schema, etc).
    
    Co-authored-by: Marcus Eriksson <[email protected]>
    Co-authored-by: Alex Petrov <[email protected]>
    Co-authored-by: Sam Tunnicliffe <[email protected]>
---
 .../org/apache/cassandra/db/SystemKeyspace.java    |  32 ++++-
 .../apache/cassandra/db/virtual/LocalTable.java    | 145 +++++++++++++++++++++
 .../cassandra/db/virtual/SystemViewsKeyspace.java  |   1 +
 3 files changed, 173 insertions(+), 5 deletions(-)

diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index cc4eafdf39..2a10af1dc5 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -98,7 +98,6 @@ import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.schema.Types;
 import org.apache.cassandra.schema.UserFunctions;
 import org.apache.cassandra.schema.Views;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.paxos.Ballot;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.service.paxos.Commit.Accepted;
@@ -109,8 +108,10 @@ import org.apache.cassandra.service.paxos.PaxosState;
 import org.apache.cassandra.service.paxos.uncommitted.PaxosRows;
 import org.apache.cassandra.service.paxos.uncommitted.PaxosUncommittedIndex;
 import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.Sealed;
+import org.apache.cassandra.tcm.membership.NodeState;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CassandraVersion;
@@ -599,7 +600,27 @@ public final class SystemKeyspace
         NEEDS_BOOTSTRAP,
         COMPLETED,
         IN_PROGRESS,
-        DECOMMISSIONED
+        DECOMMISSIONED;
+
+        public static BootstrapState fromNodeState(NodeState nodeState)
+        {
+            if (nodeState == null) // todo, handle this properly
+                return DECOMMISSIONED;
+            switch (nodeState)
+            {
+                case REGISTERED:
+                    return NEEDS_BOOTSTRAP;
+                case BOOTSTRAPPING:
+                    return IN_PROGRESS;
+                case JOINED:
+                case LEAVING:
+                case MOVING:
+                    return COMPLETED;
+                case LEFT:
+                default:
+                    return DECOMMISSIONED;
+            }
+        }
     }
 
     public static void persistLocalMetadata()
@@ -913,11 +934,11 @@ public final class SystemKeyspace
         executeInternal(format(req, LOCAL, LOCAL), version);
     }
 
-    private static Set<String> tokensAsSet(Collection<Token> tokens)
+    public static Set<String> tokensAsSet(Collection<Token> tokens)
     {
         if (tokens.isEmpty())
             return Collections.emptySet();
-        Token.TokenFactory factory = StorageService.instance.getTokenFactory();
+        Token.TokenFactory factory = 
ClusterMetadata.current().partitioner.getTokenFactory();
         Set<String> s = new HashSet<>(tokens.size());
         for (Token tk : tokens)
             s.add(factory.toString(tk));
@@ -926,7 +947,7 @@ public final class SystemKeyspace
 
     private static Collection<Token> deserializeTokens(Collection<String> 
tokensStrings)
     {
-        Token.TokenFactory factory = StorageService.instance.getTokenFactory();
+        Token.TokenFactory factory = 
ClusterMetadata.current().partitioner.getTokenFactory();
         List<Token> tokens = new ArrayList<>(tokensStrings.size());
         for (String tk : tokensStrings)
             tokens.add(factory.fromString(tk));
@@ -1262,6 +1283,7 @@ public final class SystemKeyspace
      * Read the host ID from the system keyspace, creating (and storing) one if
      * none exists.
      */
+    // TODO: this method should not exist. Only CMS can give out host ids.
     public static synchronized UUID getOrInitializeLocalHostId()
     {
         return getOrInitializeLocalHostId(UUID::randomUUID);
diff --git a/src/java/org/apache/cassandra/db/virtual/LocalTable.java 
b/src/java/org/apache/cassandra/db/virtual/LocalTable.java
new file mode 100644
index 0000000000..2c0cc11e10
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/LocalTable.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.util.HashSet;
+import java.util.stream.Collectors;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.marshal.InetAddressType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeState;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
+
+public class LocalTable extends AbstractVirtualTable
+{
+    public static final String BOOTSTRAPPED = "bootstrapped";
+    public static final String BROADCAST_ADDRESS = "broadcast_address";
+    public static final String BROADCAST_PORT = "broadcast_port";
+    public static final String CLUSTER_NAME = "cluster_name";
+    public static final String LISTEN_ADDRESS = "listen_address";
+    public static final String LISTEN_PORT = "listen_port";
+    public static final String CQL_VERSION = "cql_version";
+    public static final String DATACENTER = "datacenter";
+    public static final String RACK = "rack";
+    public static final String HOST_ID = "host_id";
+    public static final String GOSSIP_GENERATION = "gossip_generation";
+    public static final String RELEASE_VERSION = "release_version";
+    public static final String NATIVE_ADDRESS = "native_address";
+    public static final String NATIVE_PORT = "native_port";
+    public static final String NATIVE_PROTOCOL_VERSION = 
"native_protocol_version";
+    public static final String PARTITIONER = "partitioner";
+    public static final String SCHEMA_VERSION = "schema_version";
+    public static final String TOKENS = "tokens";
+    public static final String STATE = "state";
+    public static final String STATUS = "status";
+    public static final String KEY = "local";
+    public static final String TRUNCATED_AT = "truncated_at";
+
+    public LocalTable(String keyspace)
+    {
+        super(TableMetadata.builder(keyspace, "local")
+                           .comment("Information about local node")
+                           .kind(TableMetadata.Kind.VIRTUAL)
+                           .partitioner(new 
LocalPartitioner(InetAddressType.instance))
+                           .addPartitionKeyColumn(KEY, UTF8Type.instance)
+                           .addRegularColumn(BOOTSTRAPPED, UTF8Type.instance)
+                           .addRegularColumn(BROADCAST_ADDRESS, 
InetAddressType.instance)
+                           .addRegularColumn(BROADCAST_PORT, 
Int32Type.instance)
+                           .addRegularColumn(CLUSTER_NAME, UTF8Type.instance)
+                           .addRegularColumn(CQL_VERSION, UTF8Type.instance)
+                           .addRegularColumn(DATACENTER, UTF8Type.instance)
+                           .addRegularColumn(GOSSIP_GENERATION, 
Int32Type.instance)
+                           .addRegularColumn(HOST_ID, UUIDType.instance)
+                           .addRegularColumn(LISTEN_ADDRESS, 
InetAddressType.instance)
+                           .addRegularColumn(LISTEN_PORT, Int32Type.instance)
+                           .addRegularColumn(NATIVE_ADDRESS, 
InetAddressType.instance)
+                           .addRegularColumn(NATIVE_PORT, Int32Type.instance)
+                           .addRegularColumn(NATIVE_PROTOCOL_VERSION, 
UTF8Type.instance)
+                           .addRegularColumn(PARTITIONER, UTF8Type.instance)
+                           .addRegularColumn(RACK, UTF8Type.instance)
+                           .addRegularColumn(RELEASE_VERSION, 
UTF8Type.instance)
+                           .addRegularColumn(SCHEMA_VERSION, UUIDType.instance)
+                           .addRegularColumn(STATE, UTF8Type.instance)
+                           .addRegularColumn(STATUS, UTF8Type.instance)
+                           .addRegularColumn(TOKENS, 
SetType.getInstance(UTF8Type.instance, false))
+//                           .addRegularColumn(TRUNCATED_AT, 
MapType.getInstance(UUIDType.instance, UTF8Type.instance, false)) todo?
+                           .build());
+    }
+
+    public DataSet data()
+    {
+        SimpleDataSet result = new SimpleDataSet(metadata());
+
+        ClusterMetadata cm = ClusterMetadata.current();
+        NodeId peer = cm.myNodeId();
+        NodeState nodeState = cm.directory.peerState(peer);
+        NodeAddresses addresses = cm.directory.getNodeAddresses(peer);
+        Location location = cm.directory.location(peer);
+        result.row(KEY)
+              .column(BOOTSTRAPPED, 
SystemKeyspace.BootstrapState.fromNodeState(nodeState).toString())
+              .column(BROADCAST_ADDRESS, 
addresses.broadcastAddress.getAddress())
+              .column(BROADCAST_PORT, addresses.broadcastAddress.getPort())
+              .column(CLUSTER_NAME, DatabaseDescriptor.getClusterName())
+              .column(CQL_VERSION, QueryProcessor.CQL_VERSION.toString())
+              .column(DATACENTER, location.datacenter)
+              .column(GOSSIP_GENERATION, 
Gossiper.instance.getCurrentGenerationNumber(FBUtilities.getBroadcastAddressAndPort()))
+              .column(HOST_ID, peer.uuid)
+              .column(LISTEN_ADDRESS, addresses.localAddress.getAddress())
+              .column(LISTEN_PORT, addresses.localAddress.getPort())
+              .column(NATIVE_ADDRESS, addresses.nativeAddress.getAddress())
+              .column(NATIVE_PORT, addresses.nativeAddress.getPort())
+              .column(NATIVE_PROTOCOL_VERSION, 
String.valueOf(ProtocolVersion.CURRENT.asInt()))
+              .column(PARTITIONER, cm.partitioner.getClass().getName())
+              .column(RACK, location.rack)
+              .column(RELEASE_VERSION, 
cm.directory.version(peer).cassandraVersion.toString())
+              .column(SCHEMA_VERSION, cm.schema.getVersion())
+              .column(STATE, cm.directory.peerState(peer).toString())
+              .column(STATUS, status(cm))
+              .column(TOKENS, new 
HashSet<>(cm.tokenMap.tokens(peer).stream().map((token) -> 
token.getToken().getTokenValue().toString()).collect(Collectors.toList())));
+              //.column(TRUNCATED_AT, status(cm)); // todo?
+
+        return result;
+    }
+
+    private static String status(ClusterMetadata cm)
+    {
+        if (StorageService.instance.isDraining())
+            return StorageService.Mode.DRAINING.toString();
+        if (StorageService.instance.isDrained())
+            return StorageService.Mode.DRAINED.toString();
+        return cm.directory.peerState(getBroadcastAddressAndPort()).toString();
+    }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java 
b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
index 12df6c7790..7ebed45ac2 100644
--- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
@@ -50,6 +50,7 @@ public final class SystemViewsKeyspace extends VirtualKeyspace
                     .add(new QueriesTable(VIRTUAL_VIEWS))
                     .add(new LogMessagesTable(VIRTUAL_VIEWS))
                     .add(new SnapshotsTable(VIRTUAL_VIEWS))
+                    .add(new LocalTable(VIRTUAL_VIEWS))
                     .addAll(LocalRepairTables.getAll(VIRTUAL_VIEWS))
                     .build());
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to