This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 367dec0a48 TCM's Retry.Deadline#retryIndefinitely is dangerous if used 
with RemoteProcessor as the deadline does not impact message retries
367dec0a48 is described below

commit 367dec0a48ed5fe11cc1d581b9ceb703928684b8
Author: David Capwell <[email protected]>
AuthorDate: Mon Dec 9 10:29:28 2024 -0800

    TCM's Retry.Deadline#retryIndefinitely is dangerous if used with 
RemoteProcessor as the deadline does not impact message retries
    
    patch by David Capwell; reviewed by Alex Petrov, Sam Tunnicliffe for 
CASSANDRA-20059
---
 src/java/org/apache/cassandra/tcm/FetchCMSLog.java |  4 +--
 src/java/org/apache/cassandra/tcm/Processor.java   | 37 ++++++++++++++++++++--
 .../apache/cassandra/tcm/ReconstructLogState.java  |  4 +--
 src/java/org/apache/cassandra/tcm/Retry.java       | 31 +-----------------
 .../distributed/test/log/ReconstructEpochTest.java |  4 +--
 5 files changed, 41 insertions(+), 39 deletions(-)

diff --git a/src/java/org/apache/cassandra/tcm/FetchCMSLog.java 
b/src/java/org/apache/cassandra/tcm/FetchCMSLog.java
index 943c3b08fc..ae1d431abb 100644
--- a/src/java/org/apache/cassandra/tcm/FetchCMSLog.java
+++ b/src/java/org/apache/cassandra/tcm/FetchCMSLog.java
@@ -115,8 +115,8 @@ public class FetchCMSLog
             // If both we and the other node believe it should be caught up 
with a linearizable read
             boolean consistentFetch = request.consistentFetch && 
!ClusterMetadataService.instance().isCurrentMember(message.from());
 
-            Retry.Deadline retry = 
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
-                                                                    
TCMMetrics.instance.fetchLogRetries);
+            Retry.Deadline retry = 
Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
+                                                        new 
Retry.Jitter(TCMMetrics.instance.fetchLogRetries));
             LogState delta;
             if (consistentFetch)
                 delta = 
processor.get().getLogState(message.payload.lowerBound, Epoch.MAX, false, 
retry);
diff --git a/src/java/org/apache/cassandra/tcm/Processor.java 
b/src/java/org/apache/cassandra/tcm/Processor.java
index 021a11686d..33f4550887 100644
--- a/src/java/org/apache/cassandra/tcm/Processor.java
+++ b/src/java/org/apache/cassandra/tcm/Processor.java
@@ -24,10 +24,12 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import accord.utils.Invariants;
+import com.codahale.metrics.Meter;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.metrics.TCMMetrics;
 import org.apache.cassandra.tcm.log.Entry;
 import org.apache.cassandra.tcm.log.LogState;
+import org.apache.cassandra.utils.Clock;
 
 public interface Processor
 {
@@ -42,9 +44,7 @@ public interface Processor
         // submit the STARTUP message. This allows the bounces affecting 
majority of CMS nodes to finish successfully.
         if (transform.kind() == Transformation.Kind.STARTUP)
         {
-            return commit(entryId, transform, lastKnown,
-                          
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
-                                                           
TCMMetrics.instance.commitRetries));
+            return commit(entryId, transform, lastKnown, 
unsafeRetryIndefinitely());
         }
 
         return commit(entryId, transform, lastKnown,
@@ -52,6 +52,37 @@ public interface Processor
                                            new 
Retry.Jitter(TCMMetrics.instance.commitRetries)));
     }
 
+    /**
+     * Since we are using message expiration for communicating timeouts to CMS 
nodes, we have to be careful not
+     * to overflow the long, since messaging is using only 32 bits for 
deadlines. To achieve that, we are
+     * giving `timeoutNanos` every time we retry, but will retry indefinitely.
+     */
+    private static Retry.Deadline unsafeRetryIndefinitely()
+    {
+        long timeoutNanos = 
DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS);
+        Meter retryMeter = TCMMetrics.instance.commitRetries;
+        return new Retry.Deadline(Clock.Global.nanoTime() + timeoutNanos,
+                                  new Retry.Jitter(retryMeter))
+        {
+            @Override
+            public boolean reachedMax()
+            {
+                return false;
+            }
+
+            @Override
+            public long remainingNanos()
+            {
+                return timeoutNanos;
+            }
+
+            public String toString()
+            {
+                return String.format("RetryIndefinitely{tries=%d}", 
currentTries());
+            }
+        };
+    }
+
     Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch 
lastKnown, Retry.Deadline retryPolicy);
 
     /**
diff --git a/src/java/org/apache/cassandra/tcm/ReconstructLogState.java 
b/src/java/org/apache/cassandra/tcm/ReconstructLogState.java
index c8930853ad..dcc0b395e7 100644
--- a/src/java/org/apache/cassandra/tcm/ReconstructLogState.java
+++ b/src/java/org/apache/cassandra/tcm/ReconstructLogState.java
@@ -97,8 +97,8 @@ public class ReconstructLogState
                 throw new NotCMSException("This node is not in the CMS, can't 
generate a consistent log fetch response to " + message.from());
 
             LogState result = processor.get().getLogState(request.lowerBound, 
request.higherBound, request.includeSnapshot,
-                                                          
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
-                                                                               
            TCMMetrics.instance.fetchLogRetries));
+                                                          
Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
+                                                                               
new Retry.Jitter(TCMMetrics.instance.fetchLogRetries)));
 
             MessagingService.instance().send(message.responseWith(result), 
message.from());
         }
diff --git a/src/java/org/apache/cassandra/tcm/Retry.java 
b/src/java/org/apache/cassandra/tcm/Retry.java
index a954a3d1da..bb5f3dedac 100644
--- a/src/java/org/apache/cassandra/tcm/Retry.java
+++ b/src/java/org/apache/cassandra/tcm/Retry.java
@@ -190,7 +190,7 @@ public abstract class Retry
         public final long deadlineNanos;
         protected final Retry delegate;
 
-        private Deadline(long deadlineNanos, Retry delegate)
+        public Deadline(long deadlineNanos, Retry delegate)
         {
             super(delegate.maxTries, delegate.retryMeter);
             assert deadlineNanos > 0 : String.format("Deadline should be 
strictly positive but was %d.", deadlineNanos);
@@ -208,35 +208,6 @@ public abstract class Retry
             return new Deadline(Clock.Global.nanoTime() + timeoutNanos, 
delegate);
         }
 
-        /**
-         * Since we are using message expiration for communicating timeouts to 
CMS nodes, we have to be careful not
-         * to overflow the long, since messaging is using only 32 bits for 
deadlines. To achieve that, we are
-         * giving `timeoutNanos` every time we retry, but will retry 
indefinitely.
-         */
-        public static Deadline retryIndefinitely(long timeoutNanos, Meter 
retryMeter)
-        {
-            return new Deadline(Clock.Global.nanoTime() + timeoutNanos,
-                                new Retry.Jitter(Integer.MAX_VALUE, 
DEFAULT_BACKOFF_MS, new Random(), retryMeter))
-            {
-                @Override
-                public boolean reachedMax()
-                {
-                    return false;
-                }
-
-                @Override
-                public long remainingNanos()
-                {
-                    return timeoutNanos;
-                }
-
-                public String toString()
-                {
-                    return String.format("RetryIndefinitely{tries=%d}", 
currentTries());
-                }
-            };
-        }
-
         public static Deadline wrap(Retry delegate)
         {
             long deadlineMillis = delegate.maxTries * delegate.maxWait();
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java
index cf795048c3..1f76598fea 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java
@@ -80,8 +80,8 @@ public class ReconstructEpochTest extends TestBaseImpl
                                                               
.getLogState(Epoch.create(start),
                                                                            
Epoch.create(end),
                                                                            
true,
-                                                                           
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
-                                                                               
                             TCMMetrics.instance.commitRetries));
+                                                                           
Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
+                                                                               
                 new Retry.Jitter(TCMMetrics.instance.commitRetries)));
 
                     Assert.assertEquals(start, 
logState.baseState.epoch.getEpoch());
                     Iterator<Entry> iter = logState.entries.iterator();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to