This is an automated email from the ASF dual-hosted git repository.
smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 61be4d8 expose gossip information in system_views.gossip_info virtual
table
61be4d8 is described below
commit 61be4d836213f708d9a29e59b9ef1df0bebef29a
Author: Francisco Guerrero <[email protected]>
AuthorDate: Wed Mar 16 01:31:00 2022 +0100
expose gossip information in system_views.gossip_info virtual table
patch by Francisco Guerrero; reviewed by Stefan Miklosovic and Yifan Cai
for CASSANDRA-17002
This commit adds a new virtual table that exposes the gossip information in
tabular format.
The information is the same as the information presented through the
`nodetool gossipinfo`
command, but the virtual table splits the version and value from
`VersionedValue` into two
different columns. This is intented to help clients reading the vtable
without the need of
parsing the version:value information (as it currently stands in
gossipinfo).
The token value does not have a column. This is consistent with the
gossipinfo output which
always renders ":<hidden>" for the Token value. Only the token_version
column is available.
---
CHANGES.txt | 1 +
doc/modules/cassandra/pages/new/virtualtables.adoc | 22 ++-
.../cassandra/db/virtual/GossipInfoTable.java | 174 +++++++++++++++++++
.../cassandra/db/virtual/SystemViewsKeyspace.java | 1 +
src/java/org/apache/cassandra/gms/Gossiper.java | 5 +-
.../org/apache/cassandra/gms/HeartBeatState.java | 4 +-
.../cassandra/db/virtual/GossipInfoTableTest.java | 192 +++++++++++++++++++++
7 files changed, 394 insertions(+), 5 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 21c163e..3fc2300 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * Expose gossip information in system_views.gossip_info virtual table
(CASSANDRA-17002)
* Add guardrails for collection items and size (CASSANDRA-17153)
* Improve guardrails messages (CASSANDRA-17430)
* Remove all usages of junit.framework and ban them via Checkstyle
(CASSANDRA-17316)
diff --git a/doc/modules/cassandra/pages/new/virtualtables.adoc
b/doc/modules/cassandra/pages/new/virtualtables.adoc
index 69035ae..6f562d1 100644
--- a/doc/modules/cassandra/pages/new/virtualtables.adoc
+++ b/doc/modules/cassandra/pages/new/virtualtables.adoc
@@ -74,10 +74,12 @@ recent_hit_rate_per_second, recent_request_rate_per_second,
request_count, and s
|coordinator_write_latency |Records counts, keyspace_name, table_name, max,
median, and per_second for coordinator writes.
-|cql_metrcs |Metrics specific to CQL prepared statement caching.
+|cql_metrics |Metrics specific to CQL prepared statement caching.
|disk_usage |Records disk usage including disk_space, keyspace_name, and
table_name, sorted by system keyspaces.
+|gossip_info |Lists the gossip information for the cluster.
+
|internode_inbound |Lists information about the inbound internode messaging.
|internode_outbound |Information about the outbound internode messaging.
@@ -377,6 +379,24 @@ SELECT total - progress AS remaining
FROM system_views.sstable_tasks;
....
+=== Gossip Information Virtual Table
+
+The `gossip_info` virtual table lists the Gossip information for the cluster.
An example query is as follows:
+
+....
+cqlsh> select address, port, generation, heartbeat, load, dc, rack from
system_views.gossip_info;
+
+ address | port | generation | heartbeat | load | dc | rack
+-----------+------+------------+-----------+---------+-------------+-------
+ 127.0.0.1 | 7000 | 1645575140 | 312 | 70542.0 | datacenter1 | rack1
+ 127.0.0.2 | 7000 | 1645575135 | 318 | 70499.0 | datacenter1 | rack1
+ 127.0.0.3 | 7000 | 1645575140 | 312 | 70504.0 | datacenter1 | rack1
+ 127.0.0.4 | 7000 | 1645575141 | 311 | 70502.0 | datacenter1 | rack1
+ 127.0.0.5 | 7000 | 1645575136 | 315 | 70500.0 | datacenter1 | rack1
+
+(5 rows)
+....
+
=== Other Virtual Tables
Some examples of using other virtual tables are as follows.
diff --git a/src/java/org/apache/cassandra/db/virtual/GossipInfoTable.java
b/src/java/org/apache/cassandra/db/virtual/GossipInfoTable.java
new file mode 100644
index 0000000..a515feb
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/GossipInfoTable.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.cassandra.db.marshal.InetAddressType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.TableMetadata;
+
+import static org.apache.cassandra.gms.ApplicationState.TOKENS;
+
+/**
+ * A {@link VirtualTable} that return the Gossip information in tabular format.
+ */
+final class GossipInfoTable extends AbstractVirtualTable
+{
+ static final String TABLE_NAME = "gossip_info";
+ static final String TABLE_COMMENT = "lists the gossip information for the
cluster";
+
+ static final String ADDRESS = "address";
+ static final String PORT = "port";
+ static final String HOSTNAME = "hostname";
+ static final String GENERATION = "generation";
+ static final String HEARTBEAT = "heartbeat";
+
+ static final ApplicationState[] STATES_FOR_VERSIONS =
ApplicationState.values();
+ static final ApplicationState[] STATES_FOR_VALUES;
+
+ static
+ {
+ EnumSet<ApplicationState> applicationStates =
EnumSet.allOf(ApplicationState.class);
+ // do not add a column for the ApplicationState.TOKENS value
+ applicationStates.remove(TOKENS);
+ STATES_FOR_VALUES = applicationStates.toArray(new ApplicationState[0]);
+ }
+
+ /**
+ * Construct a new {@link GossipInfoTable} for the given {@code keyspace}.
+ *
+ * @param keyspace the name of the keyspace
+ */
+ GossipInfoTable(String keyspace)
+ {
+ super(buildTableMetadata(keyspace));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public DataSet data()
+ {
+ SimpleDataSet result = new SimpleDataSet(metadata());
+ for (Map.Entry<InetAddressAndPort, EndpointState> entry :
Gossiper.instance.endpointStateMap.entrySet())
+ {
+ InetAddressAndPort endpoint = entry.getKey();
+ EndpointState localState = entry.getValue();
+
+ SimpleDataSet dataSet = result.row(endpoint.getAddress(),
endpoint.getPort())
+ .column(HOSTNAME,
endpoint.getHostName())
+ .column(GENERATION,
getGeneration(localState))
+ .column(HEARTBEAT,
getHeartBeat(localState));
+
+ for (ApplicationState state : STATES_FOR_VALUES)
+ dataSet.column(state.name().toLowerCase(),
getValue(localState, state));
+
+ for (ApplicationState state : STATES_FOR_VERSIONS)
+ dataSet.column(state.name().toLowerCase() + "_version",
getVersion(localState, state));
+ }
+ return result;
+ }
+
+ /**
+ * Return the heartbeat generation of a given {@link EndpointState} or
null if {@code localState} is null.
+ *
+ * @param localState a nullable endpoint state
+ * @return the heartbeat generation if available, null otherwise
+ */
+ private Integer getGeneration(EndpointState localState)
+ {
+ return localState == null ? null :
localState.getHeartBeatState().getGeneration();
+ }
+
+ /**
+ * Return the heartbeat version of a given {@link EndpointState} or null
if {@code localState} is null.
+ *
+ * @param localState a nullable endpoint state
+ * @return the heartbeat version if available, null otherwise
+ */
+ private Integer getHeartBeat(EndpointState localState)
+ {
+ return localState == null ? null :
localState.getHeartBeatState().getHeartBeatVersion();
+ }
+
+ /**
+ * Returns the value from the {@link VersionedValue} of a given {@link
ApplicationState key}, or null
+ * if {@code localState} is null or the {@link VersionedValue} does not
exist in the {@link ApplicationState}.
+ *
+ * @param localState a nullable endpoint state
+ * @param key the key to the application state
+ * @return the value, or null if not available
+ */
+ private String getValue(EndpointState localState, ApplicationState key)
+ {
+ VersionedValue value;
+ return localState == null || (value =
localState.getApplicationState(key)) == null ? null : value.value;
+ }
+
+ /**
+ * Returns the version from the {@link VersionedValue} of a given {@link
ApplicationState key}, or null
+ * if {@code localState} is null or the {@link VersionedValue} does not
exist in the {@link ApplicationState}.
+ *
+ * @param localState a nullable endpoint state
+ * @param key the key to the application state
+ * @return the version, or null if not available
+ */
+ private Integer getVersion(EndpointState localState, ApplicationState key)
+ {
+ VersionedValue value;
+ return localState == null || (value =
localState.getApplicationState(key)) == null ? null : value.version;
+ }
+
+ /**
+ * Builds the {@link TableMetadata} to be provided to the superclass
+ *
+ * @param keyspace the name of the keyspace
+ * @return the TableMetadata class
+ */
+ private static TableMetadata buildTableMetadata(String keyspace)
+ {
+ TableMetadata.Builder builder = TableMetadata.builder(keyspace,
TABLE_NAME)
+ .comment(TABLE_COMMENT)
+
.kind(TableMetadata.Kind.VIRTUAL)
+ .partitioner(new
LocalPartitioner(InetAddressType.instance))
+
.addPartitionKeyColumn(ADDRESS, InetAddressType.instance)
+
.addClusteringColumn(PORT, Int32Type.instance)
+
.addRegularColumn(HOSTNAME, UTF8Type.instance)
+
.addRegularColumn(GENERATION, Int32Type.instance)
+
.addRegularColumn(HEARTBEAT, Int32Type.instance);
+
+ for (ApplicationState state : STATES_FOR_VALUES)
+ builder.addRegularColumn(state.name().toLowerCase(),
UTF8Type.instance);
+
+ for (ApplicationState state : STATES_FOR_VERSIONS)
+ builder.addRegularColumn(state.name().toLowerCase() + "_version",
Int32Type.instance);
+
+ return builder.build();
+ }
+}
diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
index fc0f40a..c59d7d5 100644
--- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
@@ -46,6 +46,7 @@ public final class SystemViewsKeyspace extends VirtualKeyspace
.add(new CQLMetricsTable(VIRTUAL_VIEWS))
.add(new BatchMetricsTable(VIRTUAL_VIEWS))
.add(new StreamingVirtualTable(VIRTUAL_VIEWS))
+ .add(new GossipInfoTable(VIRTUAL_VIEWS))
.build());
}
}
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 05a4f21..cd6d4dc 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -159,8 +159,9 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
@VisibleForTesting
final Set<InetAddressAndPort> seeds = new ConcurrentSkipListSet<>();
- /* map where key is the endpoint and value is the state associated with
the endpoint */
- final ConcurrentMap<InetAddressAndPort, EndpointState> endpointStateMap =
new ConcurrentHashMap<>();
+ /* map where key is the endpoint and value is the state associated with
the endpoint.
+ * This is made public to be consumed by the GossipInfoTable virtual table
*/
+ public final ConcurrentMap<InetAddressAndPort, EndpointState>
endpointStateMap = new ConcurrentHashMap<>();
/* map where key is endpoint and value is timestamp when this endpoint was
removed from
* gossip. We will ignore any gossip regarding these endpoints for
QUARANTINE_DELAY time
diff --git a/src/java/org/apache/cassandra/gms/HeartBeatState.java
b/src/java/org/apache/cassandra/gms/HeartBeatState.java
index cad6a48..3f633cb 100644
--- a/src/java/org/apache/cassandra/gms/HeartBeatState.java
+++ b/src/java/org/apache/cassandra/gms/HeartBeatState.java
@@ -70,7 +70,7 @@ public class HeartBeatState
return version == EMPTY_VERSION;
}
- int getGeneration()
+ public int getGeneration()
{
return generation;
}
@@ -80,7 +80,7 @@ public class HeartBeatState
version = VersionGenerator.getNextVersion();
}
- int getHeartBeatVersion()
+ public int getHeartBeatVersion()
{
return version;
}
diff --git a/test/unit/org/apache/cassandra/db/virtual/GossipInfoTableTest.java
b/test/unit/org/apache/cassandra/db/virtual/GossipInfoTableTest.java
new file mode 100644
index 0000000..a2bce7d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/virtual/GossipInfoTableTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.util.Map;
+import java.util.Optional;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class GossipInfoTableTest extends CQLTester
+{
+ private static final String KS_NAME = "vts";
+
+ @SuppressWarnings("FieldCanBeLocal")
+ private GossipInfoTable table;
+
+ @BeforeClass
+ public static void setUpClass()
+ {
+ CQLTester.setUpClass();
+ }
+
+ @Before
+ public void config()
+ {
+ table = new GossipInfoTable(KS_NAME);
+ VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME,
ImmutableList.of(table)));
+ }
+
+ @Test
+ public void testSelectAllWhenGossipInfoIsEmpty() throws Throwable
+ {
+ assertEmpty(execute("SELECT * FROM vts.gossip_info"));
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testSelectAllWithStateTransitions() throws Throwable
+ {
+ try
+ {
+ requireNetwork(); // triggers gossiper
+
+ UntypedResultSet resultSet = execute("SELECT * FROM
vts.gossip_info");
+
+ assertThat(resultSet.size()).isEqualTo(1);
+ assertThat(Gossiper.instance.endpointStateMap.size()).isEqualTo(1);
+
+ Optional<Map.Entry<InetAddressAndPort, EndpointState>> entry =
Gossiper.instance.endpointStateMap.entrySet()
+
.stream()
+
.findFirst();
+ assertThat(entry).isNotEmpty();
+
+ UntypedResultSet.Row row = resultSet.one();
+ assertThat(row.getColumns().size()).isEqualTo(62);
+
+ InetAddressAndPort endpoint = entry.get().getKey();
+ EndpointState localState = entry.get().getValue();
+
+ assertThat(endpoint).isNotNull();
+ assertThat(localState).isNotNull();
+
assertThat(row.getInetAddress("address")).isEqualTo(endpoint.getAddress());
+ assertThat(row.getInt("port")).isEqualTo(endpoint.getPort());
+
assertThat(row.getString("hostname")).isEqualTo(endpoint.getHostName());
+
assertThat(row.getInt("generation")).isEqualTo(localState.getHeartBeatState().getGeneration());
+
+ assertValue(row, "status", localState, ApplicationState.STATUS);
+ assertValue(row, "load", localState, ApplicationState.LOAD);
+ assertValue(row, "schema", localState, ApplicationState.SCHEMA);
+ assertValue(row, "dc", localState, ApplicationState.DC);
+ assertValue(row, "rack", localState, ApplicationState.RACK);
+ assertValue(row, "release_version", localState,
ApplicationState.RELEASE_VERSION);
+ assertValue(row, "removal_coordinator", localState,
ApplicationState.REMOVAL_COORDINATOR);
+ assertValue(row, "internal_ip", localState,
ApplicationState.INTERNAL_IP);
+ assertValue(row, "rpc_address", localState,
ApplicationState.RPC_ADDRESS);
+ assertValue(row, "severity", localState,
ApplicationState.SEVERITY);
+ assertValue(row, "net_version", localState,
ApplicationState.NET_VERSION);
+ assertValue(row, "host_id", localState, ApplicationState.HOST_ID);
+ assertValue(row, "rpc_ready", localState,
ApplicationState.RPC_READY);
+ assertValue(row, "internal_address_and_port", localState,
ApplicationState.INTERNAL_ADDRESS_AND_PORT);
+ assertValue(row, "native_address_and_port", localState,
ApplicationState.NATIVE_ADDRESS_AND_PORT);
+ assertValue(row, "status_with_port", localState,
ApplicationState.STATUS_WITH_PORT);
+ assertValue(row, "sstable_versions", localState,
ApplicationState.SSTABLE_VERSIONS);
+ assertValue(row, "x_11_padding", localState,
ApplicationState.X_11_PADDING);
+ assertValue(row, "x1", localState, ApplicationState.X1);
+ assertValue(row, "x2", localState, ApplicationState.X2);
+ assertValue(row, "x3", localState, ApplicationState.X3);
+ assertValue(row, "x4", localState, ApplicationState.X4);
+ assertValue(row, "x5", localState, ApplicationState.X5);
+ assertValue(row, "x6", localState, ApplicationState.X6);
+ assertValue(row, "x7", localState, ApplicationState.X7);
+ assertValue(row, "x8", localState, ApplicationState.X8);
+ assertValue(row, "x9", localState, ApplicationState.X9);
+ assertValue(row, "x10", localState, ApplicationState.X10);
+
+ assertVersion(row, "status_version", localState,
ApplicationState.STATUS);
+ assertVersion(row, "load_version", localState,
ApplicationState.LOAD);
+ assertVersion(row, "schema_version", localState,
ApplicationState.SCHEMA);
+ assertVersion(row, "dc_version", localState, ApplicationState.DC);
+ assertVersion(row, "rack_version", localState,
ApplicationState.RACK);
+ assertVersion(row, "release_version_version", localState,
ApplicationState.RELEASE_VERSION);
+ assertVersion(row, "removal_coordinator_version", localState,
ApplicationState.REMOVAL_COORDINATOR);
+ assertVersion(row, "internal_ip_version", localState,
ApplicationState.INTERNAL_IP);
+ assertVersion(row, "rpc_address_version", localState,
ApplicationState.RPC_ADDRESS);
+ assertVersion(row, "severity_version", localState,
ApplicationState.SEVERITY);
+ assertVersion(row, "net_version_version", localState,
ApplicationState.NET_VERSION);
+ assertVersion(row, "host_id_version", localState,
ApplicationState.HOST_ID);
+ assertVersion(row, "tokens_version", localState,
ApplicationState.TOKENS);
+ assertVersion(row, "rpc_ready_version", localState,
ApplicationState.RPC_READY);
+ assertVersion(row, "internal_address_and_port_version",
localState, ApplicationState.INTERNAL_ADDRESS_AND_PORT);
+ assertVersion(row, "native_address_and_port_version", localState,
ApplicationState.NATIVE_ADDRESS_AND_PORT);
+ assertVersion(row, "status_with_port_version", localState,
ApplicationState.STATUS_WITH_PORT);
+ assertVersion(row, "sstable_versions_version", localState,
ApplicationState.SSTABLE_VERSIONS);
+ assertVersion(row, "x_11_padding", localState,
ApplicationState.X_11_PADDING);
+ assertVersion(row, "x1", localState, ApplicationState.X1);
+ assertVersion(row, "x2", localState, ApplicationState.X2);
+ assertVersion(row, "x3", localState, ApplicationState.X3);
+ assertVersion(row, "x4", localState, ApplicationState.X4);
+ assertVersion(row, "x5", localState, ApplicationState.X5);
+ assertVersion(row, "x6", localState, ApplicationState.X6);
+ assertVersion(row, "x7", localState, ApplicationState.X7);
+ assertVersion(row, "x8", localState, ApplicationState.X8);
+ assertVersion(row, "x9", localState, ApplicationState.X9);
+ assertVersion(row, "x10", localState, ApplicationState.X10);
+ }
+ finally
+ {
+ // clean up the gossip states
+ Gossiper.instance.clearUnsafe();
+ }
+ }
+
+ private void assertValue(UntypedResultSet.Row row, String column,
EndpointState localState, ApplicationState key)
+ {
+ if (row.has(column))
+ {
+ assertThat(localState.getApplicationState(key)).as("'%s' is
expected to be not-null", key)
+ .isNotNull();
+ assertThat(row.getString(column)).as("'%s' is expected to match
column '%s'", key, column)
+
.isEqualTo(localState.getApplicationState(key).value);
+ }
+ else
+ {
+ assertThat(localState.getApplicationState(key)).as("'%s' is
expected to be null", key)
+ .isNull();
+ }
+ }
+
+ private void assertVersion(UntypedResultSet.Row row, String column,
EndpointState localState, ApplicationState key)
+ {
+ if (row.has(column))
+ {
+ assertThat(localState.getApplicationState(key)).as("'%s' is
expected to be not-null", key)
+ .isNotNull();
+ assertThat(row.getInt(column)).as("'%s' is expected to match
column '%s'", key, column)
+
.isEqualTo(localState.getApplicationState(key).version);
+ }
+ else
+ {
+ assertThat(localState.getApplicationState(key)).as("'%s' is
expected to be null", key)
+ .isNull();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]