This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 367dec0a48 TCM's Retry.Deadline#retryIndefinitely is dangerous if used
with RemoteProcessor as the deadline does not impact message retries
367dec0a48 is described below
commit 367dec0a48ed5fe11cc1d581b9ceb703928684b8
Author: David Capwell <[email protected]>
AuthorDate: Mon Dec 9 10:29:28 2024 -0800
TCM's Retry.Deadline#retryIndefinitely is dangerous if used with
RemoteProcessor as the deadline does not impact message retries
patch by David Capwell; reviewed by Alex Petrov, Sam Tunnicliffe for
CASSANDRA-20059
---
src/java/org/apache/cassandra/tcm/FetchCMSLog.java | 4 +--
src/java/org/apache/cassandra/tcm/Processor.java | 37 ++++++++++++++++++++--
.../apache/cassandra/tcm/ReconstructLogState.java | 4 +--
src/java/org/apache/cassandra/tcm/Retry.java | 31 +-----------------
.../distributed/test/log/ReconstructEpochTest.java | 4 +--
5 files changed, 41 insertions(+), 39 deletions(-)
diff --git a/src/java/org/apache/cassandra/tcm/FetchCMSLog.java
b/src/java/org/apache/cassandra/tcm/FetchCMSLog.java
index 943c3b08fc..ae1d431abb 100644
--- a/src/java/org/apache/cassandra/tcm/FetchCMSLog.java
+++ b/src/java/org/apache/cassandra/tcm/FetchCMSLog.java
@@ -115,8 +115,8 @@ public class FetchCMSLog
// If both we and the other node believe it should be caught up
with a linearizable read
boolean consistentFetch = request.consistentFetch &&
!ClusterMetadataService.instance().isCurrentMember(message.from());
- Retry.Deadline retry =
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
-
TCMMetrics.instance.fetchLogRetries);
+ Retry.Deadline retry =
Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
+ new
Retry.Jitter(TCMMetrics.instance.fetchLogRetries));
LogState delta;
if (consistentFetch)
delta =
processor.get().getLogState(message.payload.lowerBound, Epoch.MAX, false,
retry);
diff --git a/src/java/org/apache/cassandra/tcm/Processor.java
b/src/java/org/apache/cassandra/tcm/Processor.java
index 021a11686d..33f4550887 100644
--- a/src/java/org/apache/cassandra/tcm/Processor.java
+++ b/src/java/org/apache/cassandra/tcm/Processor.java
@@ -24,10 +24,12 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import accord.utils.Invariants;
+import com.codahale.metrics.Meter;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.metrics.TCMMetrics;
import org.apache.cassandra.tcm.log.Entry;
import org.apache.cassandra.tcm.log.LogState;
+import org.apache.cassandra.utils.Clock;
public interface Processor
{
@@ -42,9 +44,7 @@ public interface Processor
// submit the STARTUP message. This allows the bounces affecting
majority of CMS nodes to finish successfully.
if (transform.kind() == Transformation.Kind.STARTUP)
{
- return commit(entryId, transform, lastKnown,
-
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
-
TCMMetrics.instance.commitRetries));
+ return commit(entryId, transform, lastKnown,
unsafeRetryIndefinitely());
}
return commit(entryId, transform, lastKnown,
@@ -52,6 +52,37 @@ public interface Processor
new
Retry.Jitter(TCMMetrics.instance.commitRetries)));
}
+ /**
+ * Since we are using message expiration for communicating timeouts to CMS
nodes, we have to be careful not
+ * to overflow the long, since messaging is using only 32 bits for
deadlines. To achieve that, we are
+ * giving `timeoutNanos` every time we retry, but will retry indefinitely.
+ */
+ private static Retry.Deadline unsafeRetryIndefinitely()
+ {
+ long timeoutNanos =
DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS);
+ Meter retryMeter = TCMMetrics.instance.commitRetries;
+ return new Retry.Deadline(Clock.Global.nanoTime() + timeoutNanos,
+ new Retry.Jitter(retryMeter))
+ {
+ @Override
+ public boolean reachedMax()
+ {
+ return false;
+ }
+
+ @Override
+ public long remainingNanos()
+ {
+ return timeoutNanos;
+ }
+
+ public String toString()
+ {
+ return String.format("RetryIndefinitely{tries=%d}",
currentTries());
+ }
+ };
+ }
+
Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch
lastKnown, Retry.Deadline retryPolicy);
/**
diff --git a/src/java/org/apache/cassandra/tcm/ReconstructLogState.java
b/src/java/org/apache/cassandra/tcm/ReconstructLogState.java
index c8930853ad..dcc0b395e7 100644
--- a/src/java/org/apache/cassandra/tcm/ReconstructLogState.java
+++ b/src/java/org/apache/cassandra/tcm/ReconstructLogState.java
@@ -97,8 +97,8 @@ public class ReconstructLogState
throw new NotCMSException("This node is not in the CMS, can't
generate a consistent log fetch response to " + message.from());
LogState result = processor.get().getLogState(request.lowerBound,
request.higherBound, request.includeSnapshot,
-
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
-
TCMMetrics.instance.fetchLogRetries));
+
Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
+
new Retry.Jitter(TCMMetrics.instance.fetchLogRetries)));
MessagingService.instance().send(message.responseWith(result),
message.from());
}
diff --git a/src/java/org/apache/cassandra/tcm/Retry.java
b/src/java/org/apache/cassandra/tcm/Retry.java
index a954a3d1da..bb5f3dedac 100644
--- a/src/java/org/apache/cassandra/tcm/Retry.java
+++ b/src/java/org/apache/cassandra/tcm/Retry.java
@@ -190,7 +190,7 @@ public abstract class Retry
public final long deadlineNanos;
protected final Retry delegate;
- private Deadline(long deadlineNanos, Retry delegate)
+ public Deadline(long deadlineNanos, Retry delegate)
{
super(delegate.maxTries, delegate.retryMeter);
assert deadlineNanos > 0 : String.format("Deadline should be
strictly positive but was %d.", deadlineNanos);
@@ -208,35 +208,6 @@ public abstract class Retry
return new Deadline(Clock.Global.nanoTime() + timeoutNanos,
delegate);
}
- /**
- * Since we are using message expiration for communicating timeouts to
CMS nodes, we have to be careful not
- * to overflow the long, since messaging is using only 32 bits for
deadlines. To achieve that, we are
- * giving `timeoutNanos` every time we retry, but will retry
indefinitely.
- */
- public static Deadline retryIndefinitely(long timeoutNanos, Meter
retryMeter)
- {
- return new Deadline(Clock.Global.nanoTime() + timeoutNanos,
- new Retry.Jitter(Integer.MAX_VALUE,
DEFAULT_BACKOFF_MS, new Random(), retryMeter))
- {
- @Override
- public boolean reachedMax()
- {
- return false;
- }
-
- @Override
- public long remainingNanos()
- {
- return timeoutNanos;
- }
-
- public String toString()
- {
- return String.format("RetryIndefinitely{tries=%d}",
currentTries());
- }
- };
- }
-
public static Deadline wrap(Retry delegate)
{
long deadlineMillis = delegate.maxTries * delegate.maxWait();
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java
index cf795048c3..1f76598fea 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java
@@ -80,8 +80,8 @@ public class ReconstructEpochTest extends TestBaseImpl
.getLogState(Epoch.create(start),
Epoch.create(end),
true,
-
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
-
TCMMetrics.instance.commitRetries));
+
Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
+
new Retry.Jitter(TCMMetrics.instance.commitRetries)));
Assert.assertEquals(start,
logState.baseState.epoch.getEpoch());
Iterator<Entry> iter = logState.entries.iterator();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]