This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit c232bcb57a664a0a4ca379f03dea6d54023738a2 Author: Sam Tunnicliffe <[email protected]> AuthorDate: Thu Mar 2 14:00:55 2023 +0000 [CEP-21] Start to remove and deprecate gossip functionality WIP commit (i.e. does not compile) beginning the process of removing gossip as the source of truth regarding membership, ownership, topology and data placement. This task will be split over mutiple commits. Co-authored-by: Marcus Eriksson <[email protected]> Co-authored-by: Alex Petrov <[email protected]> Co-authored-by: Sam Tunnicliffe <[email protected]> --- .../statements/schema/AlterKeyspaceStatement.java | 24 +- .../apache/cassandra/db/filter/ColumnFilter.java | 98 +-- .../org/apache/cassandra/gms/ApplicationState.java | 38 +- .../org/apache/cassandra/gms/EndpointState.java | 27 +- .../org/apache/cassandra/gms/FailureDetector.java | 12 +- .../org/apache/cassandra/gms/GossipDigestAck.java | 2 +- .../cassandra/gms/GossipDigestAckVerbHandler.java | 11 +- .../cassandra/gms/GossipDigestSynVerbHandler.java | 5 +- src/java/org/apache/cassandra/gms/Gossiper.java | 728 +++++---------------- .../org/apache/cassandra/gms/GossiperEvent.java | 2 +- src/java/org/apache/cassandra/gms/NewGossiper.java | 143 ++++ .../org/apache/cassandra/gms/VersionedValue.java | 96 ++- .../schema/SystemDistributedKeyspace.java | 45 +- .../cassandra/tcm/compatibility/GossipHelper.java | 303 +++++++++ .../apache/cassandra/tracing/TraceKeyspace.java | 14 +- 15 files changed, 774 insertions(+), 774 deletions(-) diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java index 9efccd5464..250b18ffed 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java @@ -48,6 +48,7 @@ import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.utils.FBUtilities; @@ -91,7 +92,7 @@ public final class AlterKeyspaceStatement extends AlterSchemaStatement newKeyspace.params.validate(keyspaceName, state); newKeyspace.replicationStrategy.validate(); - + validateNoRangeMovements(); validateTransientReplication(keyspace.replicationStrategy, newKeyspace.replicationStrategy); // Because we used to not properly validate unrecognized options, we only log a warning if we find one. @@ -140,14 +141,19 @@ public final class AlterKeyspaceStatement extends AlterSchemaStatement if (allow_alter_rf_during_range_movement) return; - Stream<InetAddressAndPort> unreachableNotAdministrativelyInactive = - Gossiper.instance.getUnreachableMembers().stream().filter(endpoint -> !FBUtilities.getBroadcastAddressAndPort().equals(endpoint) && - !Gossiper.instance.isAdministrativelyInactiveState(endpoint)); - Stream<InetAddressAndPort> endpoints = Stream.concat(Gossiper.instance.getLiveMembers().stream(), - unreachableNotAdministrativelyInactive); - List<InetAddressAndPort> notNormalEndpoints = endpoints.filter(endpoint -> !FBUtilities.getBroadcastAddressAndPort().equals(endpoint) && - !Gossiper.instance.getEndpointStateForEndpoint(endpoint).isNormalState()) - .collect(Collectors.toList()); + ClusterMetadata metadata = ClusterMetadata.current(); + NodeId nodeId = metadata.directory.peerId(FBUtilities.getBroadcastAddressAndPort()); + Set<InetAddressAndPort> notNormalEndpoints = metadata.directory.states.entrySet().stream().filter(e -> !e.getKey().equals(nodeId)).filter(e -> { + switch (e.getValue()) + { + case BOOTSTRAPPING: + case LEAVING: + case MOVING: + return true; + default: + return false; + } + }).map(e -> metadata.directory.endpoint(e.getKey())).collect(Collectors.toSet()); if (!notNormalEndpoints.isEmpty()) { throw new ConfigurationException("Cannot alter RF while some endpoints are not in normal state (no range movements): " + notNormalEndpoints); diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java index 48ba7388c7..4c2774a51b 100644 --- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java @@ -31,13 +31,11 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.CellPath; -import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.utils.CassandraVersion; /** * Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected @@ -182,67 +180,6 @@ public abstract class ColumnFilter abstract RegularAndStaticColumns getFetchedColumns(TableMetadata metadata, RegularAndStaticColumns queried); } - /** - * Returns {@code true} if there are pre-4.0-rc2 nodes in the cluster, {@code false} otherwise. - * - * <p>ColumnFilters from 4.0 releases before RC2 wrongly assumed that fetching all regular columns and not - * the static columns was enough. That was not the case for queries that needed to return rows for empty partitions. - * See CASSANDRA-16686 for more details.</p> - */ - private static boolean isUpgradingFromVersionLowerThan40RC2() - { - if (Gossiper.instance.isUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_4_0_RC2)) - { - logger.trace("ColumnFilter conversion has been applied so that static columns will not be fetched because there are pre 4.0-rc2 nodes in the cluster"); - return true; - } - return false; - } - - /** - * Returns {@code true} if there are pre-4.0 nodes in the cluster, {@code false} otherwise. - * - * <p>If there pre-4.0 nodes in the cluster all static columns should be fetched along with all regular columns. - * This is due to the fact that this nodes have a different understanding of the fetchAll serialization flag. - * Pre-4.0 the fetchAll flag meant that all the columns regular AND STATIC should be fetched whereas for 4.0 - * nodes it meant that only the regular columns and the queried static columns should be fetched.</p> - */ - private static boolean isUpgradingFromVersionLowerThan40() - { - if (Gossiper.instance.isUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_4_0)) - { - logger.trace("ColumnFilter conversion has been applied so that all static columns will be fetched because there are pre 4.0 nodes in the cluster"); - return true; - } - return false; - } - - /** - * Returns {@code true} if there are pre-3.4 nodes in the cluster, {@code false} otherwise. - * - * When fetchAll is enabled on pre CASSANDRA-10657 (3.4-), queried columns are not considered at all, and it - * is assumed that all columns are queried. CASSANDRA-10657 (3.4+) brings back skipping values of columns - * which are not in queried set when fetchAll is enabled. That makes exactly the same filter being - * interpreted in a different way on 3.4- and 3.4+. - * - * Moreover, there is no way to convert the filter with fetchAll and queried != null so that it is - * interpreted the same way on 3.4- because that Cassandra version does not support such filtering. - * - * In order to avoid inconsistencies in data read by 3.4- and 3.4+ we need to avoid creation of incompatible - * filters when the cluster contains 3.4- nodes. We need to do that by using a wildcard query. - * - * see CASSANDRA-10657, CASSANDRA-15833, CASSANDRA-16415 - */ - private static boolean isUpgradingFromVersionLowerThan34() - { - if (Gossiper.instance.isUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_3_4)) - { - logger.trace("ColumnFilter conversion has been applied so that all columns will be queried because there are pre 3.4 nodes in the cluster"); - return true; - } - return false; - } - /** * A filter that includes all columns for the provided table. */ @@ -271,22 +208,9 @@ public abstract class ColumnFilter RegularAndStaticColumns queried, boolean returnStaticContentOnPartitionWithNoRows) { - // pre CASSANDRA-10657 (3.4-), when fetchAll is enabled, queried columns are not considered at all, and it - // is assumed that all columns are queried. - if (isUpgradingFromVersionLowerThan34()) - { - return new WildCardColumnFilter(metadata.regularAndStaticColumns()); - } - - // pre CASSANDRA-12768 (4.0-) all static columns should be fetched along with all regular columns. - if (isUpgradingFromVersionLowerThan40()) - { - return SelectionColumnFilter.newInstance(FetchingStrategy.ALL_COLUMNS, metadata, queried, null); - } - // pre CASSANDRA-16686 (4.0-RC2-) static columns were not fetched unless queried which led to some wrong // results for some queries - if (!returnStaticContentOnPartitionWithNoRows || isUpgradingFromVersionLowerThan40RC2()) + if (!returnStaticContentOnPartitionWithNoRows) { return SelectionColumnFilter.newInstance(FetchingStrategy.ALL_REGULARS_AND_QUERIED_STATICS_COLUMNS, metadata, queried, null); } @@ -571,20 +495,14 @@ public abstract class ColumnFilter // filters when the cluster contains 3.4- nodes. We do that by forcibly setting queried to null. // // see CASSANDRA-10657, CASSANDRA-15833, CASSANDRA-16415 - if (queried == null || isUpgradingFromVersionLowerThan34()) + if (queried == null) { return new WildCardColumnFilter(metadata.regularAndStaticColumns()); } - // pre CASSANDRA-12768 (4.0-) all static columns should be fetched along with all regular columns. - if (isUpgradingFromVersionLowerThan40()) - { - return SelectionColumnFilter.newInstance(FetchingStrategy.ALL_COLUMNS, metadata, queried, s); - } - // pre CASSANDRA-16686 (4.0-RC2-) static columns where not fetched unless queried witch lead to some wrong results // for some queries - if (!returnStaticContentOnPartitionWithNoRows || isUpgradingFromVersionLowerThan40RC2()) + if (!returnStaticContentOnPartitionWithNoRows) { return SelectionColumnFilter.newInstance(FetchingStrategy.ALL_REGULARS_AND_QUERIED_STATICS_COLUMNS, metadata, queried, s); } @@ -1041,20 +959,14 @@ public abstract class ColumnFilter { // pre CASSANDRA-10657 (3.4-), when fetchAll is enabled, queried columns are not considered at all, and it // is assumed that all columns are queried. - if (!hasQueried || isUpgradingFromVersionLowerThan34()) + if (!hasQueried) { return new WildCardColumnFilter(fetched); } - // pre CASSANDRA-12768 (4.0-) all static columns should be fetched along with all regular columns. - if (isUpgradingFromVersionLowerThan40()) - { - return new SelectionColumnFilter(FetchingStrategy.ALL_COLUMNS, queried, fetched, subSelections); - } - // pre CASSANDRA-16686 (4.0-RC2-) static columns where not fetched unless queried witch lead to some wrong results // for some queries - if (!isFetchAllStatics || isUpgradingFromVersionLowerThan40RC2()) + if (!isFetchAllStatics) { return new SelectionColumnFilter(FetchingStrategy.ALL_REGULARS_AND_QUERIED_STATICS_COLUMNS, queried, fetched, subSelections); } diff --git a/src/java/org/apache/cassandra/gms/ApplicationState.java b/src/java/org/apache/cassandra/gms/ApplicationState.java index c45d3c2602..4e07eacff2 100644 --- a/src/java/org/apache/cassandra/gms/ApplicationState.java +++ b/src/java/org/apache/cassandra/gms/ApplicationState.java @@ -30,22 +30,22 @@ public enum ApplicationState // never remove a state here, ordering matters. @Deprecated STATUS, //Deprecated and unsued in 4.0, stop publishing in 5.0, reclaim in 6.0 LOAD, - SCHEMA, - DC, - RACK, - RELEASE_VERSION, - REMOVAL_COORDINATOR, - @Deprecated INTERNAL_IP, //Deprecated and unused in 4.0, stop publishing in 5.0, reclaim in 6.0 - @Deprecated RPC_ADDRESS, // ^ Same + @Deprecated SCHEMA(false), + @Deprecated DC(true), + @Deprecated RACK(true), + @Deprecated RELEASE_VERSION(true), + @Deprecated REMOVAL_COORDINATOR, + @Deprecated INTERNAL_IP(true), //Deprecated and unused in 4.0, stop publishing in 5.0, reclaim in 6.0 + @Deprecated RPC_ADDRESS(true), // ^ Same X_11_PADDING, // padding specifically for 1.1 SEVERITY, - NET_VERSION, - HOST_ID, - TOKENS, + NET_VERSION(true), + @Deprecated HOST_ID(true), + @Deprecated TOKENS(true), RPC_READY, // pad to allow adding new states to existing cluster - INTERNAL_ADDRESS_AND_PORT, //Replacement for INTERNAL_IP with up to two ports - NATIVE_ADDRESS_AND_PORT, //Replacement for RPC_ADDRESS + @Deprecated INTERNAL_ADDRESS_AND_PORT, //Replacement for INTERNAL_IP with up to two ports + @Deprecated NATIVE_ADDRESS_AND_PORT, //Replacement for RPC_ADDRESS STATUS_WITH_PORT, //Replacement for STATUS /** * The set of sstable versions on this node. This will usually be only the "current" sstable format (the one with @@ -67,5 +67,17 @@ public enum ApplicationState X7, X8, X9, - X10, + X10; + + public final boolean derivedFromState; + + ApplicationState() + { + this(false); + } + + ApplicationState(boolean derivedFromState ) + { + this.derivedFromState = derivedFromState; + } } diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java index 0886bde156..dd912dcc20 100644 --- a/src/java/org/apache/cassandra/gms/EndpointState.java +++ b/src/java/org/apache/cassandra/gms/EndpointState.java @@ -34,6 +34,7 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.utils.CassandraVersion; import org.apache.cassandra.utils.NullableSerializer; @@ -69,10 +70,11 @@ public class EndpointState this(new HeartBeatState(other.hbState), new EnumMap<>(other.applicationState.get())); } - EndpointState(HeartBeatState initialHbState, Map<ApplicationState, VersionedValue> states) + @VisibleForTesting + public EndpointState(HeartBeatState initialHbState, Map<ApplicationState, VersionedValue> states) { hbState = initialHbState; - applicationState = new AtomicReference<Map<ApplicationState, VersionedValue>>(new EnumMap<>(states)); + applicationState = new AtomicReference<>(new EnumMap<>(states)); updateTimestamp = nanoTime(); isAlive = true; } @@ -83,6 +85,17 @@ public class EndpointState return hbState; } + public EndpointState nonDerivable() + { + Map<ApplicationState, VersionedValue> state = new EnumMap<ApplicationState, VersionedValue>(ApplicationState.class); + for (Map.Entry<ApplicationState, VersionedValue> e : applicationState.get().entrySet()) + { + if (!e.getKey().derivedFromState) + state.put(e.getKey(), e.getValue()); + } + return new EndpointState(hbState, state); + } + void setHeartBeatState(HeartBeatState newHbState) { updateTimestamp(); @@ -232,11 +245,6 @@ public class EndpointState return rpcState != null && Boolean.parseBoolean(rpcState.value); } - public boolean isNormalState() - { - return getStatus().equals(VersionedValue.STATUS_NORMAL); - } - public String getStatus() { VersionedValue status = getApplicationState(ApplicationState.STATUS_WITH_PORT); @@ -257,10 +265,7 @@ public class EndpointState @Nullable public UUID getSchemaVersion() { - VersionedValue applicationState = getApplicationState(ApplicationState.SCHEMA); - return applicationState != null - ? UUID.fromString(applicationState.value) - : null; + return ClusterMetadata.current().schema.getVersion(); } @Nullable diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java index 5177154df9..51d73cc067 100644 --- a/src/java/org/apache/cassandra/gms/FailureDetector.java +++ b/src/java/org/apache/cassandra/gms/FailureDetector.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import javax.management.openmbean.*; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.Replica; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +37,8 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MBeanWrapper; @@ -243,10 +246,13 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean continue; sb.append(" ").append(state.getKey()).append(":").append(state.getValue().version).append(":").append(state.getValue().value).append("\n"); } - VersionedValue tokens = endpointState.getApplicationState(ApplicationState.TOKENS); + ClusterMetadata metadata = ClusterMetadata.current(); + NodeId nodeId = metadata.directory.peerId(FBUtilities.getBroadcastAddressAndPort()); + List<Token> tokens = metadata.tokenMap.tokens(nodeId); if (tokens != null) { - sb.append(" TOKENS:").append(tokens.version).append(":<hidden>\n"); + // todo, used to only append tokens.version + sb.append(" TOKENS:").append(metadata.epoch.toString()).append("\n"); } else { @@ -294,7 +300,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean // it's worth being defensive here so minor bugs don't cause disproportionate // badness. (See CASSANDRA-1463 for an example). if (epState == null) - logger.error("Unknown endpoint: " + ep, new IllegalArgumentException("")); + logger.error("Unknown endpoint: " + ep, new IllegalArgumentException("Unknown endpoint: " + ep)); return epState != null && epState.isAlive(); } diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck.java b/src/java/org/apache/cassandra/gms/GossipDigestAck.java index 26494eaba9..07feab29d7 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestAck.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestAck.java @@ -41,7 +41,7 @@ public class GossipDigestAck final List<GossipDigest> gDigestList; final Map<InetAddressAndPort, EndpointState> epStateMap; - GossipDigestAck(List<GossipDigest> gDigestList, Map<InetAddressAndPort, EndpointState> epStateMap) + public GossipDigestAck(List<GossipDigest> gDigestList, Map<InetAddressAndPort, EndpointState> epStateMap) { this.gDigestList = gDigestList; this.epStateMap = epStateMap; diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java index 5fbe7ce0e3..084d41ed89 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java @@ -42,7 +42,7 @@ public class GossipDigestAckVerbHandler extends GossipVerbHandler<GossipDigestAc InetAddressAndPort from = message.from(); if (logger.isTraceEnabled()) logger.trace("Received a GossipDigestAckMessage from {}", from); - if (!Gossiper.instance.isEnabled() && !Gossiper.instance.isInShadowRound()) + if (!Gossiper.instance.isEnabled() && !NewGossiper.instance.isInShadowRound()) { if (logger.isTraceEnabled()) logger.trace("Ignoring GossipDigestAckMessage because gossip is disabled"); @@ -53,17 +53,14 @@ public class GossipDigestAckVerbHandler extends GossipVerbHandler<GossipDigestAc List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList(); Map<InetAddressAndPort, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap(); logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size()); - - if (Gossiper.instance.isInShadowRound()) + if (NewGossiper.instance.isInShadowRound()) { if (logger.isDebugEnabled()) logger.debug("Received an ack from {}, which may trigger exit from shadow round", from); - // if the ack is completely empty, then we can infer that the respondent is also in a shadow round - Gossiper.instance.maybeFinishShadowRound(from, gDigestList.isEmpty() && epStateMap.isEmpty(), epStateMap); - return; // don't bother doing anything else, we have what we came for + NewGossiper.instance.onAck(epStateMap); + return; } - if (epStateMap.size() > 0) { // Ignore any GossipDigestAck messages that we handle before a regular GossipDigestSyn has been send. diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java index abaa39b14c..c2713863cc 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java @@ -44,7 +44,7 @@ public class GossipDigestSynVerbHandler extends GossipVerbHandler<GossipDigestSy InetAddressAndPort from = message.from(); if (logger.isTraceEnabled()) logger.trace("Received a GossipDigestSynMessage from {}", from); - if (!Gossiper.instance.isEnabled() && !Gossiper.instance.isInShadowRound()) + if (!Gossiper.instance.isEnabled() && !NewGossiper.instance.isInShadowRound()) { if (logger.isTraceEnabled()) logger.trace("Ignoring GossipDigestSynMessage because gossip is disabled"); @@ -66,13 +66,12 @@ public class GossipDigestSynVerbHandler extends GossipVerbHandler<GossipDigestSy } List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests(); - // if the syn comes from a peer performing a shadow round and this node is // also currently in a shadow round, send back a minimal ack. This node must // be in the sender's seed list and doing this allows the sender to // differentiate between seeds from which it is partitioned and those which // are in their shadow round - if (!Gossiper.instance.isEnabled() && Gossiper.instance.isInShadowRound()) + if (!Gossiper.instance.isEnabled() && NewGossiper.instance.isInShadowRound()) { // a genuine syn (as opposed to one from a node currently // doing a shadow round) will always contain > 0 digests diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 0f13f6ae48..86ccee939e 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -18,8 +18,22 @@ package org.apache.cassandra.gms; import java.net.UnknownHostException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumMap; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListSet; @@ -30,47 +44,46 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BooleanSupplier; -import java.util.function.Supplier; import java.util.stream.Collectors; - import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; - -import org.apache.cassandra.concurrent.*; -import org.apache.cassandra.concurrent.FutureTask; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.net.NoPayload; -import org.apache.cassandra.net.Verb; -import org.apache.cassandra.utils.CassandraVersion; -import org.apache.cassandra.utils.ExecutorUtils; -import org.apache.cassandra.utils.ExpiringMemoizingSupplier; -import org.apache.cassandra.utils.MBeanWrapper; -import org.apache.cassandra.utils.NoSpamLogger; -import org.apache.cassandra.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.FutureTask; +import org.apache.cassandra.concurrent.ScheduledExecutorPlus; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.net.RequestCallback; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.NoPayload; +import org.apache.cassandra.net.RequestCallback; +import org.apache.cassandra.net.Verb; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeState; +import org.apache.cassandra.utils.CassandraVersion; +import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.MBeanWrapper; +import org.apache.cassandra.utils.NoSpamLogger; +import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.RecomputingSupplier; -import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; import org.apache.cassandra.utils.concurrent.NotScheduledFuture; +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; import static org.apache.cassandra.config.CassandraRelevantProperties.DISABLE_GOSSIP_ENDPOINT_REMOVAL; @@ -79,13 +92,15 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.GOSSIPER_S import static org.apache.cassandra.config.CassandraRelevantProperties.SHUTDOWN_ANNOUNCE_DELAY_IN_MS; import static org.apache.cassandra.config.DatabaseDescriptor.getClusterName; import static org.apache.cassandra.config.DatabaseDescriptor.getPartitionerName; +import static org.apache.cassandra.gms.Gossiper.GossipedWith.CMS; +import static org.apache.cassandra.gms.Gossiper.GossipedWith.SEED; +import static org.apache.cassandra.gms.VersionedValue.BOOTSTRAPPING_STATUS; import static org.apache.cassandra.net.NoPayload.noPayload; import static org.apache.cassandra.net.Verb.ECHO_REQ; import static org.apache.cassandra.net.Verb.GOSSIP_DIGEST_SYN; -import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; import static org.apache.cassandra.utils.Clock.Global.nanoTime; -import static org.apache.cassandra.gms.VersionedValue.BOOTSTRAPPING_STATUS; +import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; /** * This module is responsible for Gossiping information for the local endpoint. This abstraction @@ -120,12 +135,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean static { SILENT_SHUTDOWN_STATES.addAll(DEAD_STATES); - SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING); - SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE); } - private static final List<String> ADMINISTRATIVELY_INACTIVE_STATES = Arrays.asList(VersionedValue.HIBERNATE, - VersionedValue.REMOVED_TOKEN, - VersionedValue.STATUS_LEFT); + private volatile ScheduledFuture<?> scheduledGossipTask; private static final ReentrantLock taskLock = new ReentrantLock(); public final static int intervalInMillis = 1000; @@ -179,15 +190,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean private volatile long lastProcessedMessageAt = currentTimeMillis(); - /** - * This property is initially set to {@code true} which means that we have no information about the other nodes. - * Once all nodes are on at least this node version, it becomes {@code false}, which means that we are not - * upgrading from the previous version (major, minor). - * - * This property and anything that checks it should be removed in 5.0 - */ - private volatile boolean upgradeInProgressPossible = true; - public void clearUnsafe() { unreachableEndpoints.clear(); @@ -199,64 +201,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean seedsInShadowRound.clear(); } - // returns true when the node does not know the existence of other nodes. - private static boolean isLoneNode(Map<InetAddressAndPort, EndpointState> epStates) - { - return epStates.isEmpty() || epStates.keySet().equals(Collections.singleton(FBUtilities.getBroadcastAddressAndPort())); - } - - private static final ExpiringMemoizingSupplier.Memoized<CassandraVersion> NO_UPGRADE_IN_PROGRESS = new ExpiringMemoizingSupplier.Memoized<>(null); - private static final ExpiringMemoizingSupplier.NotMemoized<CassandraVersion> CURRENT_NODE_VERSION = new ExpiringMemoizingSupplier.NotMemoized<>(SystemKeyspace.CURRENT_VERSION); - final Supplier<ExpiringMemoizingSupplier.ReturnValue<CassandraVersion>> upgradeFromVersionSupplier = () -> - { - // Once there are no prior version nodes we don't need to keep rechecking - if (!upgradeInProgressPossible) - return NO_UPGRADE_IN_PROGRESS; - - CassandraVersion minVersion = SystemKeyspace.CURRENT_VERSION; - - // Skip the round if the gossiper has not started yet - // Otherwise, upgradeInProgressPossible can be set to false wrongly. - // If we don't know any epstate we don't know anything about the cluster. - // If we only know about ourselves, we can assume that version is CURRENT_VERSION - if (!isEnabled() || isLoneNode(endpointStateMap)) - return CURRENT_NODE_VERSION; - - // Check the release version of all the peers it heard of. Not necessary the peer that it has/had contacted with. - boolean allHostsHaveKnownVersion = true; - for (InetAddressAndPort host : endpointStateMap.keySet()) - { - if (justRemovedEndpoints.containsKey(host)) - continue; - - CassandraVersion version = getReleaseVersion(host); - - //Raced with changes to gossip state, wait until next iteration - if (version == null) - allHostsHaveKnownVersion = false; - else if (version.compareTo(minVersion) < 0) - minVersion = version; - } - - if (minVersion.compareTo(SystemKeyspace.CURRENT_VERSION) < 0) - return new ExpiringMemoizingSupplier.Memoized<>(minVersion); - - if (!allHostsHaveKnownVersion) - return new ExpiringMemoizingSupplier.NotMemoized<>(minVersion); - - upgradeInProgressPossible = false; - return NO_UPGRADE_IN_PROGRESS; - }; - - private final Supplier<CassandraVersion> upgradeFromVersionMemoized = ExpiringMemoizingSupplier.memoizeWithExpiration(upgradeFromVersionSupplier, 1, TimeUnit.MINUTES); - - @VisibleForTesting - public void expireUpgradeFromVersion() - { - upgradeInProgressPossible = true; - ((ExpiringMemoizingSupplier<CassandraVersion>) upgradeFromVersionMemoized).expire(); - } - private static final boolean disableThreadValidation = Boolean.getBoolean(Props.DISABLE_THREAD_VALIDATION); private static volatile boolean disableEndpointRemoval = DISABLE_GOSSIP_ENDPOINT_REMOVAL.getBoolean(); @@ -292,6 +236,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } } + public Map<InetAddressAndPort, EndpointState> getEndpointStates() + { + return endpointStateMap; + } + private class GossipTask implements Runnable { public void run() @@ -318,7 +267,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean gDigests); Message<GossipDigestSyn> message = Message.out(GOSSIP_DIGEST_SYN, digestSynMessage); /* Gossip to some random live member */ - boolean gossipedToSeed = doGossipToLiveMember(message); + EnumSet<GossipedWith> gossipedWith = doGossipToLiveMember(message); /* Gossip to some unreachable member with some probability to check if he is back up */ maybeGossipToUnreachableMember(message); @@ -339,8 +288,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean gossipedToSeed check. See CASSANDRA-150 for more exposition. */ - if (!gossipedToSeed || liveEndpoints.size() < seeds.size()) - maybeGossipToSeed(message); + if (!gossipedWith.contains(SEED) || liveEndpoints.size() < seeds.size()) + gossipedWith.addAll(maybeGossipToSeed(message)); + + if (!gossipedWith.contains(CMS)) + maybeGossipToCMS(message); doStatusCheck(); } @@ -376,17 +328,17 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean subscribers.add(new IEndpointStateChangeSubscriber() { public void onJoin(InetAddressAndPort endpoint, EndpointState state) - { + { maybeRecompute(state); } public void onAlive(InetAddressAndPort endpoint, EndpointState state) - { + { maybeRecompute(state); } private void maybeRecompute(EndpointState state) - { + { if (state.getApplicationState(ApplicationState.RELEASE_VERSION) != null) minVersionSupplier.recompute(); } @@ -404,36 +356,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean this.lastProcessedMessageAt = timeInMillis; } - public boolean seenAnySeed() - { - for (Map.Entry<InetAddressAndPort, EndpointState> entry : endpointStateMap.entrySet()) - { - if (seeds.contains(entry.getKey())) - return true; - try - { - VersionedValue internalIp = entry.getValue().getApplicationState(ApplicationState.INTERNAL_IP); - VersionedValue internalIpAndPort = entry.getValue().getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT); - InetAddressAndPort endpoint = null; - if (internalIpAndPort != null) - { - endpoint = InetAddressAndPort.getByName(internalIpAndPort.value); - } - else if (internalIp != null) - { - endpoint = InetAddressAndPort.getByName(internalIp.value); - } - if (endpoint != null && seeds.contains(endpoint)) - return true; - } - catch (UnknownHostException e) - { - throw new RuntimeException(e); - } - } - return false; - } - /** * Register for interesting state changes. * @@ -487,12 +409,19 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean public Set<InetAddressAndPort> getUnreachableTokenOwners() { Set<InetAddressAndPort> tokenOwners = new HashSet<>(); + ClusterMetadata metadata = ClusterMetadata.current(); for (InetAddressAndPort endpoint : unreachableEndpoints.keySet()) { - if (StorageService.instance.getTokenMetadata().isMember(endpoint)) - tokenOwners.add(endpoint); + NodeId nodeId = metadata.directory.peerId(endpoint); + NodeState state = metadata.directory.peerState(nodeId); + switch (state) + { + case JOINED: + case MOVING: + case LEAVING: + tokenOwners.add(endpoint); + } } - return tokenOwners; } @@ -538,7 +467,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean runnable.run(); return; } - FutureTask<?> task = new FutureTask<>(runnable); Stage.GOSSIP.execute(task); try @@ -706,32 +634,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean GossiperDiagnostics.quarantinedEndpoint(this, endpoint, quarantineExpiration); } - /** - * Quarantine endpoint specifically for replacement purposes. - * @param endpoint - */ - public void replacementQuarantine(InetAddressAndPort endpoint) - { - // remember, quarantineEndpoint will effectively already add QUARANTINE_DELAY, so this is 2x - quarantineEndpoint(endpoint, currentTimeMillis() + QUARANTINE_DELAY); - GossiperDiagnostics.replacementQuarantine(this, endpoint); - } - - /** - * Remove the Endpoint and evict immediately, to avoid gossiping about this node. - * This should only be called when a token is taken over by a new IP address. - * - * @param endpoint The endpoint that has been replaced - */ - public void replacedEndpoint(InetAddressAndPort endpoint) - { - checkProperThreadForStateMutation(); - removeEndpoint(endpoint); - evictFromMembership(endpoint); - replacementQuarantine(endpoint); - GossiperDiagnostics.replacedEndpoint(this, endpoint); - } - /** * The gossip digest is built based on randomization * rather than just looping through the collection of live endpoints. @@ -770,38 +672,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } } - /** - * This method will begin removing an existing endpoint from the cluster by spoofing its state - * This should never be called unless this coordinator has had 'removenode' invoked - * - * @param endpoint - the endpoint being removed - * @param hostId - the ID of the host being removed - * @param localHostId - my own host ID for replication coordination - */ - public void advertiseRemoving(InetAddressAndPort endpoint, UUID hostId, UUID localHostId) - { - EndpointState epState = endpointStateMap.get(endpoint); - // remember this node's generation - int generation = epState.getHeartBeatState().getGeneration(); - logger.info("Removing host: {}", hostId); - logger.info("Sleeping for {}ms to ensure {} does not change", StorageService.RING_DELAY_MILLIS, endpoint); - Uninterruptibles.sleepUninterruptibly(StorageService.RING_DELAY_MILLIS, TimeUnit.MILLISECONDS); - // make sure it did not change - epState = endpointStateMap.get(endpoint); - if (epState.getHeartBeatState().getGeneration() != generation) - throw new RuntimeException("Endpoint " + endpoint + " generation changed while trying to remove it"); - // update the other node's generation to mimic it as if it had changed it itself - logger.info("Advertising removal for {}", endpoint); - epState.updateTimestamp(); // make sure we don't evict it too soon - epState.getHeartBeatState().forceNewerGenerationUnsafe(); - Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class); - states.put(ApplicationState.STATUS_WITH_PORT, StorageService.instance.valueFactory.removingNonlocal(hostId)); - states.put(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(hostId)); - states.put(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(localHostId)); - epState.addApplicationStates(states); - endpointStateMap.put(endpoint, epState); - } - /** * Handles switching the endpoint's state from REMOVING_TOKEN to REMOVED_TOKEN * This should only be called after advertiseRemoving @@ -821,6 +691,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean addExpireTimeForEndpoint(endpoint, expireTime); endpointStateMap.put(endpoint, epState); // ensure at least one gossip round occurs before returning + // todo; do we need this? Uninterruptibles.sleepUninterruptibly(intervalInMillis * 2, TimeUnit.MILLISECONDS); } @@ -868,9 +739,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } Collection<Token> tokens = null; + ClusterMetadata metadata = ClusterMetadata.current(); try { - tokens = StorageService.instance.getTokenMetadata().getTokens(endpoint); + NodeId nodeId = metadata.directory.peerId(endpoint); + tokens = metadata.tokenMap.tokens(nodeId); } catch (Throwable th) { @@ -879,7 +752,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean if (tokens == null || tokens.isEmpty()) { logger.warn("Trying to assassinate an endpoint {} that does not have any tokens assigned. This should not have happened, trying to continue with a random token.", address); - tokens = Collections.singletonList(StorageService.instance.getTokenMetadata().partitioner.getRandomToken()); + tokens = Collections.singletonList(metadata.tokenMap.partitioner().getRandomToken()); } long expireTime = computeExpireTime(); @@ -908,13 +781,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * @param epSet a set of endpoint from which a random endpoint is chosen. * @return true if the chosen endpoint is also a seed. */ - private boolean sendGossip(Message<GossipDigestSyn> message, Set<InetAddressAndPort> epSet) + private EnumSet<GossipedWith> sendGossip(Message<GossipDigestSyn> message, Set<InetAddressAndPort> epSet) { List<InetAddressAndPort> endpoints = ImmutableList.copyOf(epSet); int size = endpoints.size(); if (size < 1) - return false; + return EnumSet.noneOf(GossipedWith.class); /* Generate a random number from 0 -> size */ int index = (size == 1) ? 0 : random.nextInt(size); InetAddressAndPort to = endpoints.get(index); @@ -923,21 +796,31 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean if (firstSynSendAt == 0) firstSynSendAt = nanoTime(); MessagingService.instance().send(message, to); + EnumSet<GossipedWith> gossipedWith = EnumSet.noneOf(GossipedWith.class); - boolean isSeed = seeds.contains(to); + if (seeds.contains(to)) + gossipedWith.add(SEED); + if (ClusterMetadata.current().cmsMembers().contains(to)) + gossipedWith.add(CMS); GossiperDiagnostics.sendGossipDigestSyn(this, to); - return isSeed; + return gossipedWith; } /* Sends a Gossip message to a live member and returns true if the recipient was a seed */ - private boolean doGossipToLiveMember(Message<GossipDigestSyn> message) + private EnumSet<GossipedWith> doGossipToLiveMember(Message<GossipDigestSyn> message) { int size = liveEndpoints.size(); if (size == 0) - return false; + return EnumSet.noneOf(GossipedWith.class); return sendGossip(message, liveEndpoints); } + enum GossipedWith + { + SEED, + CMS + } + /* Sends a Gossip message to an unreachable member */ private void maybeGossipToUnreachableMember(Message<GossipDigestSyn> message) { @@ -957,19 +840,20 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } /* Possibly gossip to a seed for facilitating partition healing */ - private void maybeGossipToSeed(Message<GossipDigestSyn> prod) + private EnumSet<GossipedWith> maybeGossipToSeed(Message<GossipDigestSyn> prod) { int size = seeds.size(); + EnumSet<GossipedWith> gossipedWith = EnumSet.noneOf(GossipedWith.class); if (size > 0) { if (size == 1 && seeds.contains(getBroadcastAddressAndPort())) { - return; + return gossipedWith; } if (liveEndpoints.size() == 0) { - sendGossip(prod, seeds); + gossipedWith = sendGossip(prod, seeds); } else { @@ -977,76 +861,35 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean double probability = seeds.size() / (double) (liveEndpoints.size() + unreachableEndpoints.size()); double randDbl = random.nextDouble(); if (randDbl <= probability) - sendGossip(prod, seeds); + gossipedWith = sendGossip(prod, seeds); } } + return gossipedWith; } - public boolean isGossipOnlyMember(InetAddressAndPort endpoint) + private void maybeGossipToCMS(Message<GossipDigestSyn> message) { - EndpointState epState = endpointStateMap.get(endpoint); - if (epState == null) + Set<InetAddressAndPort> cms = ClusterMetadata.current().cmsMembers(); + if (cms.contains(getBroadcastAddressAndPort())) + return; + + double probability = cms.size() / (double) (liveEndpoints.size() + unreachableEndpoints.size()); + double randDbl = random.nextDouble(); + if (randDbl <= probability) { - return false; + logger.trace("Sending GossipDigestSyn to the CMS {}", cms); + sendGossip(message, cms); } - return !isDeadState(epState) && !StorageService.instance.getTokenMetadata().isMember(endpoint); } - /** - * Check if this node can safely be started and join the ring. - * If the node is bootstrapping, examines gossip state for any previous status to decide whether - * it's safe to allow this node to start and bootstrap. If not bootstrapping, compares the host ID - * that the node itself has (obtained by reading from system.local or generated if not present) - * with the host ID obtained from gossip for the endpoint address (if any). This latter case - * prevents a non-bootstrapping, new node from being started with the same address of a - * previously started, but currently down predecessor. - * - * @param endpoint - the endpoint to check - * @param localHostUUID - the host id to check - * @param isBootstrapping - whether the node intends to bootstrap when joining - * @param epStates - endpoint states in the cluster - * @return true if it is safe to start the node, false otherwise - */ - public boolean isSafeForStartup(InetAddressAndPort endpoint, UUID localHostUUID, boolean isBootstrapping, - Map<InetAddressAndPort, EndpointState> epStates) + public boolean isGossipOnlyMember(InetAddressAndPort endpoint) { - EndpointState epState = epStates.get(endpoint); - // if there's no previous state, we're good + EndpointState epState = endpointStateMap.get(endpoint); if (epState == null) - return true; - - String status = getGossipStatus(epState); - - if (status.equals(VersionedValue.HIBERNATE) - && !SystemKeyspace.bootstrapComplete()) { - logger.warn("A node with the same IP in hibernate status was detected. Was a replacement already attempted?"); return false; } - - //the node was previously removed from the cluster - if (isDeadState(epState)) - return true; - - if (isBootstrapping) - { - // these states are not allowed to join the cluster as it would not be safe - final List<String> unsafeStatuses = new ArrayList<String>() - {{ - add(""); // failed bootstrap but we did start gossiping - add(VersionedValue.STATUS_NORMAL); // node is legit in the cluster or it was stopped with kill -9 - add(VersionedValue.SHUTDOWN); // node was shutdown - }}; - return !unsafeStatuses.contains(status); - } - else - { - // if the previous UUID matches what we currently have (i.e. what was read from - // system.local at startup), then we're good to start up. Otherwise, something - // is amiss and we need to replace the previous node - VersionedValue previous = epState.getApplicationState(ApplicationState.HOST_ID); - return UUID.fromString(previous.value).equals(localHostUUID); - } + return !isDeadState(epState) && !ClusterMetadata.current().directory.allAddresses().contains(endpoint); } @VisibleForTesting @@ -1072,6 +915,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } } + ClusterMetadata metadata = ClusterMetadata.current(); Set<InetAddressAndPort> eps = endpointStateMap.keySet(); for (InetAddressAndPort endpoint : eps) { @@ -1097,7 +941,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean // to make sure that the previous read data was correct logger.info("Race condition marking {} as a FatClient; ignoring", endpoint); return; - } + } removeEndpoint(endpoint); // will put it in justRemovedEndpoints to respect quarantine delay evictFromMembership(endpoint); // can get rid of the state immediately }); @@ -1106,7 +950,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean // check for dead state removal long expireTime = getExpireTimeForEndpoint(endpoint); if (!epState.isAlive() && (now > expireTime) - && (!StorageService.instance.getTokenMetadata().isMember(endpoint))) + && (!metadata.directory.allAddresses().contains(endpoint))) { if (logger.isDebugEnabled()) { @@ -1195,16 +1039,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return lastProcessedMessageAt; } - public UUID getHostId(InetAddressAndPort endpoint) - { - return getHostId(endpoint, endpointStateMap); - } - - public UUID getHostId(InetAddressAndPort endpoint, Map<InetAddressAndPort, EndpointState> epStates) - { - return UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value); - } - /** * The value for the provided application state for the provided endpoint as currently known by this Gossip instance. * @@ -1271,17 +1105,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return reqdEndpointState; } - /** - * determine which endpoint started up earlier - */ - public int compareEndpointStartup(InetAddressAndPort addr1, InetAddressAndPort addr2) - { - EndpointState ep1 = getEndpointStateForEndpoint(addr1); - EndpointState ep2 = getEndpointStateForEndpoint(addr2); - assert ep1 != null && ep2 != null; - return ep1.getHeartBeatState().getGeneration() - ep2.getHeartBeatState().getGeneration(); - } - public void notifyFailureDetector(Map<InetAddressAndPort, EndpointState> remoteEpStateMap) { for (Entry<InetAddressAndPort, EndpointState> entry : remoteEpStateMap.entrySet()) @@ -1389,7 +1212,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } /** - * Used by {@link #markDead(InetAddressAndPort, EndpointState)} and {@link #addSavedEndpoint(InetAddressAndPort)} + * Used by {@link #markDead(InetAddressAndPort, EndpointState)} * to register a endpoint as dead. This method is "silent" to avoid triggering listeners, diagnostics, or logs * on startup via addSavedEndpoint. */ @@ -1415,6 +1238,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean EndpointState localEpState = endpointStateMap.get(ep); if (!isDeadState(epState)) { + // confusing log message, epState status might still be 'shutdown' - keeping if anyone is using it for automation + // we're not actually marking it as up until we get the echo request response in markAlive below if (localEpState != null) logger.info("Node {} has restarted, now UP", ep); else @@ -1472,23 +1297,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return SILENT_SHUTDOWN_STATES.contains(status); } - public boolean isAdministrativelyInactiveState(EndpointState epState) - { - String status = getGossipStatus(epState); - if (status.isEmpty()) - return false; - - return ADMINISTRATIVELY_INACTIVE_STATES.contains(status); - } - - public boolean isAdministrativelyInactiveState(InetAddressAndPort endpoint) - { - EndpointState epState = getEndpointStateForEndpoint(endpoint); - if (epState == null) - return true; // if the end point cannot be found, treat as inactive - return isAdministrativelyInactiveState(epState); - } - public static String getGossipStatus(EndpointState epState) { if (epState == null) @@ -1582,11 +1390,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean public void applyStateLocally(Map<InetAddressAndPort, EndpointState> epStateMap) { checkProperThreadForStateMutation(); - boolean hasMajorVersion3Nodes = hasMajorVersion3Nodes(); for (Entry<InetAddressAndPort, EndpointState> entry : order(epStateMap)) { InetAddressAndPort ep = entry.getKey(); - if (ep.equals(getBroadcastAddressAndPort()) && !isInShadowRound()) + if (ep.equals(getBroadcastAddressAndPort())) continue; if (justRemovedEndpoints.containsKey(ep)) { @@ -1597,8 +1404,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean EndpointState localEpStatePtr = endpointStateMap.get(ep); EndpointState remoteState = entry.getValue(); - if (!hasMajorVersion3Nodes) - remoteState.removeMajorVersion3LegacyApplicationStates(); + remoteState.removeMajorVersion3LegacyApplicationStates(); /* If state does not exist just add it. If it does then add it if the remote generation is greater. @@ -1633,7 +1439,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean if (remoteMaxVersion > localMaxVersion) { // apply states, but do not notify since there is no major change - applyNewStates(ep, localEpStatePtr, remoteState, hasMajorVersion3Nodes); + applyNewStates(ep, localEpStatePtr, remoteState.nonDerivable()); } else if (logger.isTraceEnabled()) logger.trace("Ignoring remote version {} <= {} for {}", remoteMaxVersion, localMaxVersion, ep); @@ -1656,7 +1462,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } } - private void applyNewStates(InetAddressAndPort addr, EndpointState localState, EndpointState remoteState, boolean hasMajorVersion3Nodes) + private void applyNewStates(InetAddressAndPort addr, EndpointState localState, EndpointState remoteState) { // don't assert here, since if the node restarts the version will go back to zero int oldVersion = localState.getHeartBeatState().getHeartBeatVersion(); @@ -1668,7 +1474,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean Set<Entry<ApplicationState, VersionedValue>> remoteStates = remoteState.states(); assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration(); - Set<Entry<ApplicationState, VersionedValue>> updatedStates = remoteStates.stream().filter(entry -> { // filter out the states that are already up to date (has the same or higher version) VersionedValue local = localState.getApplicationState(entry.getKey()); @@ -1683,10 +1488,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } } localState.addApplicationStates(updatedStates); - - // get rid of legacy fields once the cluster is not in mixed mode - if (!hasMajorVersion3Nodes) - localState.removeMajorVersion3LegacyApplicationStates(); + localState.removeMajorVersion3LegacyApplicationStates(); // need to run STATUS or STATUS_WITH_PORT first to handle BOOT_REPLACE correctly (else won't be a member, so TOKENS won't be processed) for (Entry<ApplicationState, VersionedValue> updatedEntry : updatedStates) @@ -1800,13 +1602,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean if (null != hostId) { state.addApplicationState(ApplicationState.HOST_ID, - StorageService.instance.valueFactory.hostId(hostId)); + StorageService.instance.valueFactory.hostId(hostId)); } Set<Token> tokens = SystemKeyspace.loadTokens().get(endpoint); if (null != tokens && !tokens.isEmpty()) { state.addApplicationState(ApplicationState.TOKENS, - StorageService.instance.valueFactory.tokens(tokens)); + StorageService.instance.valueFactory.tokens(tokens)); } } map.put(endpoint, state); @@ -1908,97 +1710,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean TimeUnit.MILLISECONDS); } - public synchronized Map<InetAddressAndPort, EndpointState> doShadowRound() - { - return doShadowRound(Collections.EMPTY_SET); - } - - /** - * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the - * map return value, instead of endpointStateMap. - * - * Used when preparing to join the ring: - * <ul> - * <li>when replacing a node, to get and assume its tokens</li> - * <li>when joining, to check that the local host id matches any previous id for the endpoint address</li> - * </ul> - * - * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared - * again by calling {@link Gossiper#maybeFinishShadowRound(InetAddressAndPort, boolean, Map)}. This will update - * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the - * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at - * the same time. - * - * @param peers Additional peers to try gossiping with. - * @return endpoint states gathered during shadow round or empty map - */ - public synchronized Map<InetAddressAndPort, EndpointState> doShadowRound(Set<InetAddressAndPort> peers) - { - buildSeedsList(); - // it may be that the local address is the only entry in the seed + peers - // list in which case, attempting a shadow round is pointless - if (seeds.isEmpty() && peers.isEmpty()) - return endpointShadowStateMap; - - boolean isSeed = DatabaseDescriptor.getSeeds().contains(getBroadcastAddressAndPort()); - // We double RING_DELAY if we're not a seed to increase chance of successful startup during a full cluster bounce, - // giving the seeds a chance to startup before we fail the shadow round - int shadowRoundDelay = isSeed ? StorageService.RING_DELAY_MILLIS : StorageService.RING_DELAY_MILLIS * 2; - seedsInShadowRound.clear(); - endpointShadowStateMap.clear(); - // send a completely empty syn - List<GossipDigest> gDigests = new ArrayList<>(); - GossipDigestSyn digestSynMessage = new GossipDigestSyn(getClusterName(), getPartitionerName(), gDigests); - Message<GossipDigestSyn> message = Message.out(GOSSIP_DIGEST_SYN, digestSynMessage); - - inShadowRound = true; - boolean includePeers = false; - int slept = 0; - try - { - while (true) - { - if (slept % 5000 == 0) - { // CASSANDRA-8072, retry at the beginning and every 5 seconds - logger.trace("Sending shadow round GOSSIP DIGEST SYN to seeds {}", seeds); - - for (InetAddressAndPort seed : seeds) - MessagingService.instance().send(message, seed); - - // Send to any peers we already know about, but only if a seed didn't respond. - if (includePeers) - { - logger.trace("Sending shadow round GOSSIP DIGEST SYN to known peers {}", peers); - for (InetAddressAndPort peer : peers) - MessagingService.instance().send(message, peer); - } - includePeers = true; - } - - Thread.sleep(1000); - if (!inShadowRound) - break; - - slept += 1000; - if (slept > shadowRoundDelay) - { - // if we got here no peers could be gossiped to. If we're a seed that's OK, but otherwise we stop. See CASSANDRA-13851 - if (!isSeed) - throw new RuntimeException("Unable to gossip with any peers"); - - inShadowRound = false; - break; - } - } - } - catch (InterruptedException e) - { - throw new UncheckedInterruptedException(e); - } - - return ImmutableMap.copyOf(endpointShadowStateMap); - } - @VisibleForTesting void buildSeedsList() { @@ -2086,45 +1797,14 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean epstate.getHeartBeatState().forceNewerGenerationUnsafe(); } - - /** - * Add an endpoint we knew about previously, but whose state is unknown - */ - public void addSavedEndpoint(InetAddressAndPort ep) - { - checkProperThreadForStateMutation(); - if (ep.equals(getBroadcastAddressAndPort())) - { - logger.debug("Attempt to add self as saved endpoint"); - return; - } - - //preserve any previously known, in-memory data about the endpoint (such as DC, RACK, and so on) - EndpointState epState = endpointStateMap.get(ep); - if (epState != null) - { - logger.debug("not replacing a previous epState for {}, but reusing it: {}", ep, epState); - epState.setHeartBeatState(HeartBeatState.empty()); - } - else - { - epState = new EndpointState(HeartBeatState.empty()); - logger.info("Adding {} as there was no previous epState; new state is {}", ep, epState); - } - - epState.markDead(); - endpointStateMap.put(ep, epState); - silentlyMarkDead(ep, epState); - if (logger.isTraceEnabled()) - logger.trace("Adding saved endpoint {} {}", ep, epState.getHeartBeatState().getGeneration()); - } - private void addLocalApplicationStateInternal(ApplicationState state, VersionedValue value) { assert taskLock.isHeldByCurrentThread(); InetAddressAndPort epAddr = getBroadcastAddressAndPort(); EndpointState epState = endpointStateMap.get(epAddr); - assert epState != null : "Can't find endpoint state for " + epAddr; + // todo; this can be null during startup log replay when bootstrapping + // - we would have no state for ourselves + if (epState == null) return; // Fire "before change" notifications: doBeforeChangeNotifications(epAddr, epState, state, value); // Notifications may have taken some time, so preventively raise the version @@ -2183,78 +1863,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled()); } - public boolean sufficientForStartupSafetyCheck(Map<InetAddressAndPort, EndpointState> epStateMap) - { - // it is possible for a previously queued ack to be sent to us when we come back up in shadow - EndpointState localState = epStateMap.get(FBUtilities.getBroadcastAddressAndPort()); - // return false if response doesn't contain state necessary for safety check - return localState == null || isDeadState(localState) || localState.containsApplicationState(ApplicationState.HOST_ID); - } - - protected void maybeFinishShadowRound(InetAddressAndPort respondent, boolean isInShadowRound, Map<InetAddressAndPort, EndpointState> epStateMap) - { - if (inShadowRound) - { - if (!isInShadowRound) - { - if (!sufficientForStartupSafetyCheck(epStateMap)) - { - logger.debug("Not exiting shadow round because received ACK with insufficient states {} -> {}", - FBUtilities.getBroadcastAddressAndPort(), epStateMap.get(FBUtilities.getBroadcastAddressAndPort())); - return; - } - - if (!seeds.contains(respondent)) - logger.warn("Received an ack from {}, who isn't a seed. Ensure your seed list includes a live node. Exiting shadow round", - respondent); - logger.debug("Received a regular ack from {}, can now exit shadow round", respondent); - // respondent sent back a full ack, so we can exit our shadow round - endpointShadowStateMap.putAll(epStateMap); - inShadowRound = false; - seedsInShadowRound.clear(); - } - else - { - // respondent indicates it too is in a shadow round, if all seeds - // are in this state then we can exit our shadow round. Otherwise, - // we keep retrying the SR until one responds with a full ACK or - // we learn that all seeds are in SR. - logger.debug("Received an ack from {} indicating it is also in shadow round", respondent); - seedsInShadowRound.add(respondent); - if (seedsInShadowRound.containsAll(seeds)) - { - logger.debug("All seeds are in a shadow round, clearing this node to exit its own"); - inShadowRound = false; - seedsInShadowRound.clear(); - } - } - } - } - - public boolean isInShadowRound() - { - return inShadowRound; - } - - /** - * Creates a new dead {@link EndpointState} that is {@link EndpointState#isEmptyWithoutStatus() empty}. This is used during - * host replacement for edge cases where the seed notified that the endpoint was empty, so need to add such state - * into gossip explicitly (as empty endpoints are not gossiped outside of the shadow round). - * - * see CASSANDRA-16213 - */ - public void initializeUnreachableNodeUnsafe(InetAddressAndPort addr) - { - EndpointState state = new EndpointState(HeartBeatState.empty()); - state.markDead(); - EndpointState oldState = endpointStateMap.putIfAbsent(addr, state); - if (null != oldState) - { - throw new RuntimeException("Attempted to initialize endpoint state for unreachable node, " + - "but found existing endpoint state for it."); - } - } - @VisibleForTesting public void initializeNodeUnsafe(InetAddressAndPort addr, UUID uuid, int generationNbr) { @@ -2346,6 +1954,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return state != null ? state.getSchemaVersion() : null; } + // TODO: (TM/alexp): we do not need to wait for gossip to settle anymore, since main keys are now coming from TM public static void waitToSettle() { int forceAfter = GOSSIPER_SKIP_WAITING_TO_SETTLE.getInt(); @@ -2369,7 +1978,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean totalPolls++; if (currentSize == epSize) { - logger.debug("Gossip looks settled."); + logger.debug("Gossip looks settled. {}", Gossiper.instance.endpointStateMap); numOkay++; } else @@ -2398,6 +2007,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * @param unit TimeUnit of maxWait * @return true if agreement was reached, false if not */ + // TODO: (TM/alexp): we do not need to wait for schema convergence for the purpose of view building; + // we will rely on different mechanisms for propagating mutations correctly public boolean waitForSchemaAgreement(long maxWait, TimeUnit unit, BooleanSupplier abortCondition) { int waited = 0; @@ -2419,39 +2030,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } } - /** - * Returns {@code false} only if the information about the version of each node in the cluster is available and - * ALL the nodes are on 4.0+ (regardless of the patch version). - */ - public boolean hasMajorVersion3Nodes() - { - return isUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_4_0) || // this is quite obvious - // however if we discovered only nodes at current version so far (in particular only this node), - // but still there are nodes with unknown version, we also want to report that the cluster may have nodes at 3.x - upgradeInProgressPossible && !isUpgradingFromVersionLowerThan(SystemKeyspace.CURRENT_VERSION.familyLowerBound.get()); - } - - /** - * Returns {@code true} if there are nodes on version lower than the provided version - */ - public boolean isUpgradingFromVersionLowerThan(CassandraVersion referenceVersion) - { - return isUpgradingFromVersionLowerThanC17653(referenceVersion).left; - } - - /* TODO: Aux method for debug purposes on fixing C17653. To be removed*/ - @VisibleForTesting - public Pair<Boolean, CassandraVersion> isUpgradingFromVersionLowerThanC17653(CassandraVersion referenceVersion) - { - CassandraVersion v = upgradeFromVersionMemoized.get(); - if (CassandraVersion.NULL_VERSION.equals(v) && scheduledGossipTask == null) - return Pair.create(false, v); - - boolean res = v != null && v.compareTo(referenceVersion) < 0; - - return Pair.create(res, v); - } - private boolean nodesAgreeOnSchema(Collection<InetAddressAndPort> nodes) { UUID expectedVersion = null; @@ -2563,27 +2141,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean firstSynSendAt = 1; } - public Collection<InetAddressAndPort> unsafeClearRemoteState() - { - List<InetAddressAndPort> removed = new ArrayList<>(); - for (InetAddressAndPort ep : endpointStateMap.keySet()) - { - if (ep.equals(getBroadcastAddressAndPort())) - continue; - - for (IEndpointStateChangeSubscriber subscriber : subscribers) - subscriber.onRemove(ep); - - removed.add(ep); - } - this.endpointStateMap.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort())); - this.endpointShadowStateMap.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort())); - this.expireTimeEndpointMap.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort())); - this.justRemovedEndpoints.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort())); - this.unreachableEndpoints.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort())); - return removed; - } - public void unsafeGossipWith(InetAddressAndPort ep) { /* Update the local heartbeat counter. */ @@ -2623,4 +2180,49 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean Message<GossipDigestAck2> message = Message.out(Verb.GOSSIP_DIGEST_ACK2, digestAck2Message); MessagingService.instance().send(message, ep); } + + public void unsafeUpdateEpStates(InetAddressAndPort endpoint, EndpointState epstate) + { + taskLock.lock(); + try + { + checkProperThreadForStateMutation(); + assert !endpoint.equals(getBroadcastAddressAndPort()) || epstate.getHeartBeatState().getGeneration() > 0 : + "We should not update epstates with generation = 0 for the local host"; + EndpointState old = endpointStateMap.get(endpoint); + if (old == null) + endpointStateMap.put(endpoint, epstate); + else + old.addApplicationStates(epstate.states()); + + if (!getBroadcastAddressAndPort().equals(endpoint)) + { + // don't consider it a major state change if the generation is 0 - this means we have only added it locally for a remote node + if (epstate.getHeartBeatState().getGeneration() > 0 && + (old == null || old.getHeartBeatState().getGeneration() < epstate.getHeartBeatState().getGeneration())) + handleMajorStateChange(endpoint, epstate); + } + } + finally + { + taskLock.unlock(); + } + } + + public void triggerRoundWithCMS() + { + ClusterMetadata metadata = ClusterMetadata.current(); + Set<InetAddressAndPort> cms = metadata.cmsMembers(); + if (!cms.contains(getBroadcastAddressAndPort())) + { + logger.debug("Triggering gossip round with CMS {}", metadata.epoch); + final List<GossipDigest> gDigests = new ArrayList<>(); + Gossiper.instance.makeRandomGossipDigest(gDigests); + GossipDigestSyn digestSynMessage = new GossipDigestSyn(getClusterName(), + getPartitionerName(), + gDigests); + Message<GossipDigestSyn> message = Message.out(GOSSIP_DIGEST_SYN, digestSynMessage); + sendGossip(message, cms); + } + } } diff --git a/src/java/org/apache/cassandra/gms/GossiperEvent.java b/src/java/org/apache/cassandra/gms/GossiperEvent.java index 71fee7c991..44d4ad6247 100644 --- a/src/java/org/apache/cassandra/gms/GossiperEvent.java +++ b/src/java/org/apache/cassandra/gms/GossiperEvent.java @@ -77,7 +77,7 @@ public final class GossiperEvent extends DiagnosticEvent this.localState = localState; this.endpointStateMap = gossiper.getEndpointStateMap(); - this.inShadowRound = gossiper.isInShadowRound(); + this.inShadowRound = false; // todo; gossiper.isInShadowRound(); this.justRemovedEndpoints = gossiper.getJustRemovedEndpoints(); this.lastProcessedMessageAt = gossiper.getLastProcessedMessageAt(); this.liveEndpoints = gossiper.getLiveMembers(); diff --git a/src/java/org/apache/cassandra/gms/NewGossiper.java b/src/java/org/apache/cassandra/gms/NewGossiper.java new file mode 100644 index 0000000000..9fb889b286 --- /dev/null +++ b/src/java/org/apache/cassandra/gms/NewGossiper.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.concurrent.Accumulator; +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Promise; + +import static org.apache.cassandra.config.DatabaseDescriptor.getClusterName; +import static org.apache.cassandra.config.DatabaseDescriptor.getPartitionerName; +import static org.apache.cassandra.net.Verb.GOSSIP_DIGEST_SYN; +import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; + +public class NewGossiper +{ + private static final Logger logger = LoggerFactory.getLogger(NewGossiper.class); + public static final NewGossiper instance = new NewGossiper(); + + private volatile ShadowRoundHandler handler; + + public Map<InetAddressAndPort, EndpointState> doShadowRound() + { + Set<InetAddressAndPort> peers = new HashSet<>(SystemKeyspace.loadHostIds().keySet()); + if (peers.isEmpty()) + peers.addAll(DatabaseDescriptor.getSeeds()); + + ShadowRoundHandler shadowRoundHandler = new ShadowRoundHandler(peers); + handler = shadowRoundHandler; + + try + { + return shadowRoundHandler.doShadowRound().get(30, TimeUnit.SECONDS); + } + catch (InterruptedException | ExecutionException | TimeoutException e) + { + throw new RuntimeException(e); + } + } + + + public boolean isInShadowRound() + { + ShadowRoundHandler srh = handler; + return srh != null && !srh.isDone(); + } + + void onAck( Map<InetAddressAndPort, EndpointState> epStateMap) + { + ShadowRoundHandler srh = handler; + if (srh != null && !srh.isDone()) + srh.onAck(epStateMap); + } + + public static class ShadowRoundHandler + { + private volatile boolean isDone = false; + private final Set<InetAddressAndPort> peers; + private final Accumulator<Map<InetAddressAndPort, EndpointState>> responses; + private final int requiredResponses; + private final Promise<Map<InetAddressAndPort, EndpointState>> promise = new AsyncPromise<>(); + + public ShadowRoundHandler(Set<InetAddressAndPort> peers) + { + this.peers = peers; + requiredResponses = Math.max(peers.size() / 10, 1); + responses = new Accumulator<>(requiredResponses); + } + + public boolean isDone() + { + return isDone; + } + + public Promise<Map<InetAddressAndPort, EndpointState>> doShadowRound() + { + // send a completely empty syn + GossipDigestSyn digestSynMessage = new GossipDigestSyn(getClusterName(), getPartitionerName(), new ArrayList<>()); + Message<GossipDigestSyn> message = Message.out(GOSSIP_DIGEST_SYN, digestSynMessage); + + logger.info("Sending shadow round GOSSIP DIGEST SYN to known peers {}", peers); + for (InetAddressAndPort peer : peers) + { + if (!peer.equals(getBroadcastAddressAndPort())) + MessagingService.instance().send(message, peer); + } + return promise; + } + + public void onAck(Map<InetAddressAndPort, EndpointState> epStateMap) + { + if (!isDone) + { + if (!epStateMap.isEmpty()) + responses.add(epStateMap); + + logger.debug("Received {} responses. {} required.", responses.size(), requiredResponses); + if (responses.size() >= requiredResponses) + { + isDone = true; + promise.setSuccess(merge(responses.snapshot())); + } + } + } + + private Map<InetAddressAndPort, EndpointState> merge(Collection<Map<InetAddressAndPort, EndpointState>> snapshot) + { + return snapshot.iterator().next(); // todo; actually merge/verify the responses + } + } +} diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java index d76f3011f4..f6c02258de 100644 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@ -22,6 +22,7 @@ import java.net.InetAddress; import java.util.Collection; import java.util.Set; import java.util.UUID; +import java.util.function.Supplier; import java.util.stream.Collectors; import static java.nio.charset.StandardCharsets.ISO_8859_1; @@ -69,20 +70,31 @@ public class VersionedValue implements Comparable<VersionedValue> public final static String DELIMITER_STR = new String(new char[]{ DELIMITER }); // values for ApplicationState.STATUS + @Deprecated public final static String STATUS_BOOTSTRAPPING = "BOOT"; + @Deprecated public final static String STATUS_BOOTSTRAPPING_REPLACE = "BOOT_REPLACE"; + @Deprecated public final static String STATUS_NORMAL = "NORMAL"; + @Deprecated public final static String STATUS_LEAVING = "LEAVING"; + @Deprecated public final static String STATUS_LEFT = "LEFT"; + @Deprecated public final static String STATUS_MOVING = "MOVING"; + @Deprecated public final static String REMOVING_TOKEN = "removing"; + @Deprecated public final static String REMOVED_TOKEN = "removed"; + @Deprecated public final static String HIBERNATE = "hibernate"; + @Deprecated public final static String SHUTDOWN = "shutdown"; // values for ApplicationState.REMOVAL_COORDINATOR + @Deprecated public final static String REMOVAL_COORDINATOR = "REMOVER"; public static Set<String> BOOTSTRAPPING_STATUS = ImmutableSet.of(STATUS_BOOTSTRAPPING, STATUS_BOOTSTRAPPING_REPLACE); @@ -137,10 +149,17 @@ public class VersionedValue implements Comparable<VersionedValue> public static class VersionedValueFactory { final IPartitioner partitioner; + private final Supplier<Integer> versionSupplier; public VersionedValueFactory(IPartitioner partitioner) + { + this(partitioner, VersionGenerator::getNextVersion); + } + + public VersionedValueFactory(IPartitioner partitioner, Supplier<Integer> versionSupplier) { this.partitioner = partitioner; + this.versionSupplier = versionSupplier; } public VersionedValue cloneWithHigherVersion(VersionedValue value) @@ -151,24 +170,29 @@ public class VersionedValue implements Comparable<VersionedValue> @Deprecated public VersionedValue bootReplacing(InetAddress oldNode) { - return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE, oldNode.getHostAddress())); + return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE, oldNode.getHostAddress()), versionSupplier.get()); } + @Deprecated public VersionedValue bootReplacingWithPort(InetAddressAndPort oldNode) { - return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE, oldNode.getHostAddressAndPort())); + return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE, oldNode.getHostAddressAndPort()), versionSupplier.get()); } + @Deprecated public VersionedValue bootstrapping(Collection<Token> tokens) { return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING, - makeTokenString(tokens))); + makeTokenString(tokens)), + versionSupplier.get()); } + @Deprecated public VersionedValue normal(Collection<Token> tokens) { return new VersionedValue(versionString(VersionedValue.STATUS_NORMAL, - makeTokenString(tokens))); + makeTokenString(tokens)), + versionSupplier.get()); } private String makeTokenString(Collection<Token> tokens) @@ -178,7 +202,8 @@ public class VersionedValue implements Comparable<VersionedValue> public VersionedValue load(double load) { - return new VersionedValue(String.valueOf(load)); + return new VersionedValue(String.valueOf(load), + versionSupplier.get()); } public VersionedValue diskUsage(String state) @@ -188,22 +213,27 @@ public class VersionedValue implements Comparable<VersionedValue> public VersionedValue schema(UUID newVersion) { - return new VersionedValue(newVersion.toString()); + return new VersionedValue(newVersion.toString(), versionSupplier.get()); } + @Deprecated public VersionedValue leaving(Collection<Token> tokens) { return new VersionedValue(versionString(VersionedValue.STATUS_LEAVING, - makeTokenString(tokens))); + makeTokenString(tokens)), + versionSupplier.get()); } + @Deprecated public VersionedValue left(Collection<Token> tokens, long expireTime) { return new VersionedValue(versionString(VersionedValue.STATUS_LEFT, makeTokenString(tokens), - Long.toString(expireTime))); + Long.toString(expireTime)), + versionSupplier.get()); } + @Deprecated @VisibleForTesting public VersionedValue left(Collection<Token> tokens, long expireTime, int generation) { @@ -212,16 +242,20 @@ public class VersionedValue implements Comparable<VersionedValue> Long.toString(expireTime)), generation); } + @Deprecated public VersionedValue moving(Token token) { - return new VersionedValue(VersionedValue.STATUS_MOVING + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); + return new VersionedValue(VersionedValue.STATUS_MOVING + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token), + versionSupplier.get()); } + @Deprecated public VersionedValue hostId(UUID hostId) { - return new VersionedValue(hostId.toString()); + return new VersionedValue(hostId.toString(), versionSupplier.get()); } + @Deprecated public VersionedValue tokens(Collection<Token> tokens) { ByteArrayOutputStream bos = new ByteArrayOutputStream(); @@ -234,101 +268,105 @@ public class VersionedValue implements Comparable<VersionedValue> { throw new RuntimeException(e); } - return new VersionedValue(new String(bos.toByteArray(), ISO_8859_1)); + return new VersionedValue(new String(bos.toByteArray(), ISO_8859_1), versionSupplier.get()); } + @Deprecated public VersionedValue removingNonlocal(UUID hostId) { - return new VersionedValue(versionString(VersionedValue.REMOVING_TOKEN, hostId.toString())); + return new VersionedValue(versionString(VersionedValue.REMOVING_TOKEN, hostId.toString()), versionSupplier.get()); } + @Deprecated public VersionedValue removedNonlocal(UUID hostId, long expireTime) { - return new VersionedValue(versionString(VersionedValue.REMOVED_TOKEN, hostId.toString(), Long.toString(expireTime))); + return new VersionedValue(versionString(VersionedValue.REMOVED_TOKEN, hostId.toString(), Long.toString(expireTime)), versionSupplier.get()); } + @Deprecated public VersionedValue removalCoordinator(UUID hostId) { - return new VersionedValue(versionString(VersionedValue.REMOVAL_COORDINATOR, hostId.toString())); + return new VersionedValue(versionString(VersionedValue.REMOVAL_COORDINATOR, hostId.toString()), versionSupplier.get()); } + @Deprecated public VersionedValue hibernate(boolean value) { - return new VersionedValue(VersionedValue.HIBERNATE + VersionedValue.DELIMITER + value); + return new VersionedValue(VersionedValue.HIBERNATE + VersionedValue.DELIMITER + value, versionSupplier.get()); } public VersionedValue rpcReady(boolean value) { - return new VersionedValue(String.valueOf(value)); + return new VersionedValue(String.valueOf(value), versionSupplier.get()); } public VersionedValue shutdown(boolean value) { - return new VersionedValue(VersionedValue.SHUTDOWN + VersionedValue.DELIMITER + value); + return new VersionedValue(VersionedValue.SHUTDOWN + VersionedValue.DELIMITER + value, versionSupplier.get()); } public VersionedValue datacenter(String dcId) { - return new VersionedValue(dcId); + return new VersionedValue(dcId, versionSupplier.get()); } public VersionedValue rack(String rackId) { - return new VersionedValue(rackId); + return new VersionedValue(rackId, versionSupplier.get()); } public VersionedValue rpcaddress(InetAddress endpoint) { - return new VersionedValue(endpoint.getHostAddress()); + return new VersionedValue(endpoint.getHostAddress(), versionSupplier.get()); } public VersionedValue nativeaddressAndPort(InetAddressAndPort address) { - return new VersionedValue(address.getHostAddressAndPort()); + return new VersionedValue(address.getHostAddressAndPort(), versionSupplier.get()); } public VersionedValue releaseVersion() { - return new VersionedValue(FBUtilities.getReleaseVersionString()); + return new VersionedValue(FBUtilities.getReleaseVersionString(), versionSupplier.get()); } @VisibleForTesting public VersionedValue releaseVersion(String version) { - return new VersionedValue(version); + return new VersionedValue(version, versionSupplier.get()); } @VisibleForTesting public VersionedValue networkVersion(int version) { - return new VersionedValue(String.valueOf(version)); + return new VersionedValue(String.valueOf(version), versionSupplier.get()); } public VersionedValue networkVersion() { - return new VersionedValue(String.valueOf(MessagingService.current_version)); + return new VersionedValue(String.valueOf(MessagingService.current_version), versionSupplier.get()); } public VersionedValue internalIP(InetAddress private_ip) { - return new VersionedValue(private_ip.getHostAddress()); + return new VersionedValue(private_ip.getHostAddress(), versionSupplier.get()); } public VersionedValue internalAddressAndPort(InetAddressAndPort private_ip_and_port) { - return new VersionedValue(private_ip_and_port.getHostAddressAndPort()); + return new VersionedValue(private_ip_and_port.getHostAddressAndPort(), versionSupplier.get()); } public VersionedValue severity(double value) { - return new VersionedValue(String.valueOf(value)); + return new VersionedValue(String.valueOf(value), versionSupplier.get()); } public VersionedValue sstableVersions(Set<VersionAndType> versions) { return new VersionedValue(versions.stream() .map(VersionAndType::toString) - .collect(Collectors.joining(","))); + .collect(Collectors.joining(",")), versionSupplier.get()); } } diff --git a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java index dc40093d4d..563dedf935 100644 --- a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java @@ -225,10 +225,6 @@ public final class SystemDistributedKeyspace public static void startRepairs(TimeUUID id, TimeUUID parent_id, String keyspaceName, String[] cfnames, CommonRange commonRange) { - // Don't record repair history if an upgrade is in progress as version 3 nodes generates errors - // due to schema differences - boolean includeNewColumns = !Gossiper.instance.hasMajorVersion3Nodes(); - InetAddressAndPort coordinator = FBUtilities.getBroadcastAddressAndPort(); Set<String> participants = Sets.newHashSet(); Set<String> participants_v2 = Sets.newHashSet(); @@ -250,35 +246,18 @@ public final class SystemDistributedKeyspace { for (Range<Token> range : commonRange.ranges) { - String fmtQry; - if (includeNewColumns) - { - fmtQry = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY, - keyspaceName, - cfname, - id.toString(), - parent_id.toString(), - range.left.toString(), - range.right.toString(), - coordinator.getHostAddress(false), - coordinator.getPort(), - Joiner.on("', '").join(participants), - Joiner.on("', '").join(participants_v2), - RepairState.STARTED.toString()); - } - else - { - fmtQry = format(queryWithoutNewColumns, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY, - keyspaceName, - cfname, - id.toString(), - parent_id.toString(), - range.left.toString(), - range.right.toString(), - coordinator.getHostAddress(false), - Joiner.on("', '").join(participants), - RepairState.STARTED.toString()); - } + String fmtQry = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY, + keyspaceName, + cfname, + id.toString(), + parent_id.toString(), + range.left.toString(), + range.right.toString(), + coordinator.getHostAddress(false), + coordinator.getPort(), + Joiner.on("', '").join(participants), + Joiner.on("', '").join(participants_v2), + RepairState.STARTED.toString()); processSilent(fmtQry); } } diff --git a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java index 7ff6b87c59..2f4270ace6 100644 --- a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java +++ b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java @@ -18,23 +18,67 @@ package org.apache.cassandra.tcm.compatibility; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.Collection; import java.util.Collections; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.UUID; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; + import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.HeartBeatState; +import org.apache.cassandra.gms.TokenSerializer; +import org.apache.cassandra.gms.VersionGenerator; +import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.schema.SchemaKeyspace; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.Period; +import org.apache.cassandra.tcm.extensions.ExtensionKey; +import org.apache.cassandra.tcm.extensions.ExtensionValue; import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeAddresses; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeState; +import org.apache.cassandra.tcm.membership.NodeVersion; import org.apache.cassandra.tcm.ownership.DataPlacements; import org.apache.cassandra.tcm.ownership.TokenMap; +import org.apache.cassandra.tcm.ownership.UniformRangePlacement; import org.apache.cassandra.tcm.sequences.InProgressSequences; import org.apache.cassandra.tcm.sequences.LockedRanges; +import org.apache.cassandra.utils.CassandraVersion; + +import static org.apache.cassandra.gms.ApplicationState.DC; +import static org.apache.cassandra.gms.ApplicationState.HOST_ID; +import static org.apache.cassandra.gms.ApplicationState.INTERNAL_ADDRESS_AND_PORT; +import static org.apache.cassandra.gms.ApplicationState.INTERNAL_IP; +import static org.apache.cassandra.gms.ApplicationState.NATIVE_ADDRESS_AND_PORT; +import static org.apache.cassandra.gms.ApplicationState.RACK; +import static org.apache.cassandra.gms.ApplicationState.RPC_ADDRESS; +import static org.apache.cassandra.gms.ApplicationState.TOKENS; +import static org.apache.cassandra.gms.VersionedValue.unsafeMakeVersionedValue; +import static org.apache.cassandra.locator.InetAddressAndPort.getByName; +import static org.apache.cassandra.locator.InetAddressAndPort.getByNameOverrideDefaults; +import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; public class GossipHelper { @@ -44,6 +88,209 @@ public class GossipHelper StorageService.instance.valueFactory.schema(version)); } + public static void removeFromGossip(InetAddressAndPort addr) + { + Gossiper.runInGossipStageBlocking(() -> Gossiper.instance.removeEndpoint(addr)); + } + + /** + * Basic idea is that we can't ever bump the generation or version for a remote node + * + * If the remote node is not yet known, set generation and version to 0 to make sure that we don't overwrite + * any state generated by the remote node itself + * + * If the remote node is known, keep the remote generation and version and just update the versioned value in + * place, this makes sure that if the remote node changed, those values will override anything we have here. + */ + public static void mergeNodeToGossip(NodeId nodeId, ClusterMetadata metadata) + { + mergeNodeToGossip(nodeId, metadata, metadata.tokenMap.tokens(nodeId)); + } + + public static void mergeNodeToGossip(NodeId nodeId, ClusterMetadata metadata, Collection<Token> tokens) + { + boolean isLocal = nodeId.equals(metadata.myNodeId()); + IPartitioner partitioner = metadata.tokenMap.partitioner(); + NodeAddresses addresses = metadata.directory.getNodeAddresses(nodeId); + Location location = metadata.directory.location(nodeId); + InetAddressAndPort endpoint = addresses.broadcastAddress; + VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner, + isLocal ? VersionGenerator::getNextVersion : () -> 0); + Gossiper.runInGossipStageBlocking(() -> { + EndpointState epstate = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (epstate == null) + epstate = new EndpointState(HeartBeatState.empty()); + Map<ApplicationState, VersionedValue> newStates = new EnumMap<>(ApplicationState.class); + for (ApplicationState appState : ApplicationState.values()) + { + VersionedValue value = epstate.getApplicationState(appState); + VersionedValue newValue = null; + switch (appState) + { + case DC: + newValue = valueFactory.datacenter(location.datacenter); + break; + case RACK: + newValue = valueFactory.rack(location.rack); + break; + case RELEASE_VERSION: + newValue = valueFactory.releaseVersion(metadata.directory.version(nodeId).cassandraVersion.toString()); + break; + case RPC_ADDRESS: + newValue = valueFactory.rpcaddress(endpoint.getAddress()); + break; + case HOST_ID: + if (getBroadcastAddressAndPort().equals(addresses.broadcastAddress)) + SystemKeyspace.setLocalHostId(nodeId.uuid); + newValue = valueFactory.hostId(nodeId.uuid); + break; + case TOKENS: + if (tokens != null) + newValue = valueFactory.tokens(tokens); + break; + case INTERNAL_ADDRESS_AND_PORT: + newValue = valueFactory.internalAddressAndPort(addresses.localAddress); + break; + case NATIVE_ADDRESS_AND_PORT: + newValue = valueFactory.nativeaddressAndPort(addresses.nativeAddress); + break; + case STATUS: + // only publish/add STATUS if there are non-upgraded hosts + if (metadata.directory.versions.values().stream().anyMatch(v -> !v.isUpgraded()) && tokens != null && !tokens.isEmpty()) + newValue = nodeStateToStatus(metadata.directory.peerState(nodeId), tokens, valueFactory); + break; + case STATUS_WITH_PORT: + if (tokens != null && !tokens.isEmpty()) + newValue = nodeStateToStatus(metadata.directory.peerState(nodeId), tokens, valueFactory); + break; + default: + newValue = value; + } + if (newValue != null) + { + // note that version needs to be > -1 here, otherwise Gossiper#sendAll on generation change doesn't send it + if (!isLocal) + newValue = unsafeMakeVersionedValue(newValue.value, value == null ? 0 : value.version); + newStates.put(appState, newValue); + } + } + HeartBeatState heartBeatState = new HeartBeatState(epstate.getHeartBeatState().getGeneration(), isLocal ? VersionGenerator.getNextVersion() : 0); + Gossiper.instance.unsafeUpdateEpStates(endpoint, new EndpointState(heartBeatState, newStates)); + }); + } + + private static VersionedValue nodeStateToStatus(NodeState nodeState, Collection<Token> tokens, VersionedValue.VersionedValueFactory valueFactory) + { + VersionedValue status = null; + switch (nodeState) + { + case JOINED: + status = valueFactory.normal(tokens); + break; + case LEFT: + status = valueFactory.left(tokens, Gossiper.computeExpireTime()); + break; + case BOOTSTRAPPING: + status = valueFactory.bootstrapping(tokens); + break; + case LEAVING: + status = valueFactory.leaving(tokens); + break; + case MOVING: + status = valueFactory.moving(tokens.iterator().next()); //todo + break; + case REGISTERED: + break; + default: + throw new RuntimeException("Bad NodeState " + nodeState); + } + return status; + } + + public static void mergeAllNodeStatesToGossip(ClusterMetadata metadata) + { + Gossiper.runInGossipStageBlocking(() -> { + for (Map.Entry<NodeId, NodeState> entry : metadata.directory.states.entrySet()) + mergeNodeToGossip(entry.getKey(), metadata); + }); + } + + private static Collection<Token> getTokensIn(IPartitioner partitioner, EndpointState epState) + { + try + { + if (epState == null) + return Collections.emptyList(); + + VersionedValue versionedValue = epState.getApplicationState(TOKENS); + if (versionedValue == null) + return Collections.emptyList(); + + return TokenSerializer.deserialize(partitioner, new DataInputStream(new ByteArrayInputStream(versionedValue.toBytes()))); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + private static NodeState toNodeState(InetAddressAndPort endpoint, EndpointState epState) + { + assert epState != null; + + String status = epState.getStatus(); + if (status.equals(VersionedValue.STATUS_NORMAL) || + status.equals(VersionedValue.SHUTDOWN)) + return NodeState.JOINED; + if (status.equals(VersionedValue.STATUS_LEFT)) + return NodeState.LEFT; + throw new IllegalStateException("Can't upgrade the first node when STATUS = " + status + " for node " + endpoint); + } + + private static NodeAddresses getAddressesFromEndpointState(InetAddressAndPort endpoint, EndpointState epState) + { + if (endpoint.equals(getBroadcastAddressAndPort())) + return NodeAddresses.current(); + try + { + InetAddressAndPort local = getEitherState(endpoint, epState, INTERNAL_ADDRESS_AND_PORT, INTERNAL_IP, DatabaseDescriptor.getStoragePort()); + InetAddressAndPort nativeAddress = getEitherState(endpoint, epState, NATIVE_ADDRESS_AND_PORT, RPC_ADDRESS, DatabaseDescriptor.getNativeTransportPort()); + return new NodeAddresses(endpoint, local, nativeAddress); + } + catch (UnknownHostException e) + { + throw new ConfigurationException("Unknown host in epState for " + endpoint + " : " + epState, e); + } + } + + private static InetAddressAndPort getEitherState(InetAddressAndPort endpoint, + EndpointState epState, + ApplicationState primaryState, + ApplicationState deprecatedState, + int defaultPortForDeprecatedState) throws UnknownHostException + { + if (epState.getApplicationState(primaryState) != null) + { + return getByName(epState.getApplicationState(primaryState).value); + } + else if (epState.getApplicationState(deprecatedState) != null) + { + return getByNameOverrideDefaults(epState.getApplicationState(deprecatedState).value, defaultPortForDeprecatedState); + } + else + { + return endpoint.withPort(defaultPortForDeprecatedState); + } + } + + private static NodeVersion getVersionFromEndpointState(InetAddressAndPort endpoint, EndpointState epState) + { + if (endpoint.equals(getBroadcastAddressAndPort())) + return NodeVersion.CURRENT; + CassandraVersion cassandraVersion = epState.getReleaseVersion(); + return NodeVersion.fromCassandraVersion(cassandraVersion); + } + public static ClusterMetadata emptyWithSchemaFromSystemTables() { return new ClusterMetadata(Epoch.UPGRADE_STARTUP, @@ -59,4 +306,60 @@ public class GossipHelper Collections.emptySet(), Collections.emptyMap()); } + + public static ClusterMetadata fromEndpointStates(DistributedSchema schema, Map<InetAddressAndPort, EndpointState> epStates) + { + return fromEndpointStates(epStates, DatabaseDescriptor.getPartitioner(), schema); + } + @VisibleForTesting + public static ClusterMetadata fromEndpointStates(Map<InetAddressAndPort, EndpointState> epStates, IPartitioner partitioner, DistributedSchema schema) + { + Directory directory = new Directory(); + TokenMap tokenMap = new TokenMap(partitioner); + List<InetAddressAndPort> sortedEps = Lists.newArrayList(epStates.keySet()); + Collections.sort(sortedEps); + Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions = new HashMap<>(); + for (InetAddressAndPort endpoint : sortedEps) + { + EndpointState epState = epStates.get(endpoint); + String dc = epState.getApplicationState(DC).value; + String rack = epState.getApplicationState(RACK).value; + String hostIdString = epState.getApplicationState(HOST_ID).value; + NodeAddresses nodeAddresses = getAddressesFromEndpointState(endpoint, epState); + NodeVersion nodeVersion = getVersionFromEndpointState(endpoint, epState); + assert hostIdString != null; + directory = directory.withNonUpgradedNode(nodeAddresses, + new Location(dc, rack), + nodeVersion, + toNodeState(endpoint, epState), + UUID.fromString(hostIdString)); + NodeId nodeId = directory.peerId(endpoint); + tokenMap = tokenMap.assignTokens(nodeId, getTokensIn(partitioner, epState)); + } + + ClusterMetadata forPlacementCalculation = new ClusterMetadata(Epoch.UPGRADE_GOSSIP, + Period.EMPTY, + true, + partitioner, + schema, + directory, + tokenMap, + DataPlacements.empty(), + LockedRanges.EMPTY, + InProgressSequences.EMPTY, + Collections.emptySet(), + extensions); + return new ClusterMetadata(Epoch.UPGRADE_GOSSIP, + Period.EMPTY, + true, + partitioner, + schema, + directory, + tokenMap, + new UniformRangePlacement().calculatePlacements(forPlacementCalculation, schema.getKeyspaces()), + LockedRanges.EMPTY, + InProgressSequences.EMPTY, + Collections.emptySet(), + extensions); + } } diff --git a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java index c6ca1f28a0..8101b6721f 100644 --- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java +++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java @@ -121,10 +121,9 @@ public final class TraceKeyspace Row.SimpleBuilder rb = builder.row(); rb.ttl(ttl) .add("client", client) - .add("coordinator", FBUtilities.getBroadcastAddressAndPort().getAddress()); - if (!Gossiper.instance.hasMajorVersion3Nodes()) - rb.add("coordinator_port", FBUtilities.getBroadcastAddressAndPort().getPort()); - rb.add("request", request) + .add("coordinator", FBUtilities.getBroadcastAddressAndPort().getAddress()) + .add("coordinator_port", FBUtilities.getBroadcastAddressAndPort().getPort()) + .add("request", request) .add("started_at", new Date(startedAt)) .add("command", command) .appendAll("parameters", parameters); @@ -148,10 +147,9 @@ public final class TraceKeyspace .ttl(ttl); rowBuilder.add("activity", message) - .add("source", FBUtilities.getBroadcastAddressAndPort().getAddress()); - if (!Gossiper.instance.hasMajorVersion3Nodes()) - rowBuilder.add("source_port", FBUtilities.getBroadcastAddressAndPort().getPort()); - rowBuilder.add("thread", threadName); + .add("source", FBUtilities.getBroadcastAddressAndPort().getAddress()) + .add("source_port", FBUtilities.getBroadcastAddressAndPort().getPort()) + .add("thread", threadName); if (elapsed >= 0) rowBuilder.add("source_elapsed", elapsed); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
