This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
new 1c71e57f46 PaxosPrepare may add instances to the Electorate that are
not in gossip
1c71e57f46 is described below
commit 1c71e57f46f704228d41fb367e31cd11f0602134
Author: David Capwell <[email protected]>
AuthorDate: Thu Jan 26 09:13:24 2023 -0800
PaxosPrepare may add instances to the Electorate that are not in gossip
patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-18194
---
CHANGES.txt | 1 +
.../org/apache/cassandra/service/paxos/Paxos.java | 40 ++++++++++++++++++----
2 files changed, 34 insertions(+), 7 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index a29c9e008b..7ecf0e320b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1.1
+ * PaxosPrepare may add instances to the Electorate that are not in gossip
(CASSANDRA-18194)
* Fix PAXOS2_COMMIT_AND_PREPARE_RSP serialisation AssertionError
(CASSANDRA-18164)
* Streaming progress virtual table lock contention can trigger
TCP_USER_TIMEOUT and fail streaming (CASSANDRA-18110)
* Fix perpetual load of denylist on read in cases where denylist can never be
loaded (CASSANDRA-18116)
diff --git a/src/java/org/apache/cassandra/service/paxos/Paxos.java
b/src/java/org/apache/cassandra/service/paxos/Paxos.java
index 5bdb75c78f..bf5f90e55f 100644
--- a/src/java/org/apache/cassandra/service/paxos/Paxos.java
+++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java
@@ -20,19 +20,23 @@ package org.apache.cassandra.service.paxos;
import java.io.IOException;
import java.util.Collection;
+import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.codahale.metrics.Meter;
import org.apache.cassandra.exceptions.CasWriteTimeoutException;
import org.apache.cassandra.exceptions.ExceptionCode;
@@ -90,6 +94,7 @@ import org.apache.cassandra.utils.CollectionSerializer;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.service.paxos.PaxosPrepare.FoundIncompleteAccepted;
import
org.apache.cassandra.service.paxos.PaxosPrepare.FoundIncompleteCommitted;
+import org.apache.cassandra.utils.NoSpamLogger;
import static java.util.Collections.emptyMap;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -121,6 +126,7 @@ import static
org.apache.cassandra.service.paxos.PaxosPropose.propose;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static org.apache.cassandra.utils.CollectionSerializer.newHashSet;
import static
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
+import static org.apache.cassandra.utils.NoSpamLogger.Level.WARN;
/**
* <p>This class serves as an entry-point to Cassandra's implementation of
Paxos Consensus.
@@ -204,11 +210,13 @@ import static
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
*/
public class Paxos
{
+ private static final Logger logger = LoggerFactory.getLogger(Paxos.class);
+
private static volatile Config.PaxosVariant PAXOS_VARIANT =
DatabaseDescriptor.getPaxosVariant();
private static final CassandraVersion MODERN_PAXOS_RELEASE = new
CassandraVersion(System.getProperty("cassandra.paxos.modern_release", "4.1"));
static final boolean LOG_TTL_LINEARIZABILITY_VIOLATIONS =
Boolean.parseBoolean(System.getProperty("cassandra.paxos.log_ttl_linearizability_violations",
"true"));
- static class Electorate
+ static class Electorate implements Iterable<InetAddressAndPort>
{
static final Serializer serializer = new Serializer();
@@ -229,10 +237,10 @@ public class Paxos
return natural.size() + pending.size();
}
- public void forEach(Consumer<InetAddressAndPort> forEach)
+ @Override
+ public Iterator<InetAddressAndPort> iterator()
{
- natural.forEach(forEach);
- pending.forEach(forEach);
+ return Iterators.concat(natural.iterator(), pending.iterator());
}
static Electorate get(TableMetadata table, DecoratedKey key,
ConsistencyLevel consistency)
@@ -1156,8 +1164,26 @@ public class Paxos
return emptyMap();
Map<InetAddressAndPort, EndpointState> endpoints =
Maps.newHashMapWithExpectedSize(remoteElectorate.size() +
localElectorate.size());
- remoteElectorate.forEach(host -> endpoints.put(host,
Gossiper.instance.copyEndpointStateForEndpoint(host)));
- localElectorate.forEach(host -> endpoints.putIfAbsent(host,
Gossiper.instance.copyEndpointStateForEndpoint(host)));
+ for (InetAddressAndPort host : remoteElectorate)
+ {
+ EndpointState endpoint =
Gossiper.instance.copyEndpointStateForEndpoint(host);
+ if (endpoint == null)
+ {
+ NoSpamLogger.log(logger, WARN, 1, TimeUnit.MINUTES, "Remote
electorate {} could not be found in Gossip", host);
+ continue;
+ }
+ endpoints.put(host, endpoint);
+ }
+ for (InetAddressAndPort host : localElectorate)
+ {
+ EndpointState endpoint =
Gossiper.instance.copyEndpointStateForEndpoint(host);
+ if (endpoint == null)
+ {
+ NoSpamLogger.log(logger, WARN, 1, TimeUnit.MINUTES, "Local
electorate {} could not be found in Gossip", host);
+ continue;
+ }
+ endpoints.putIfAbsent(host, endpoint);
+ }
return endpoints;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]