This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 186b5bc8d39 Do not retry task actions when a lock is revoked (#18182)
186b5bc8d39 is described below

commit 186b5bc8d39a49c7e77a5d8e906cb373b819b888
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Jul 1 17:26:52 2025 +0530

    Do not retry task actions when a lock is revoked (#18182)
    
    Changes:
    - Throw a DruidException of forbidden category if task locks are revoked so 
that the `ServiceClient`
    does not retry these errors.
    - Reduce the default number of retries in the remote task action client to 
13 (about 10 minutes worth of retries).
---
 docs/configuration/index.md                        |  4 ++--
 .../error/InsertLockPreemptedFaultTest.java        |  2 +-
 .../druid/indexing/common/RetryPolicyConfig.java   |  2 +-
 .../druid/indexing/common/actions/TaskLocks.java   | 15 +++++++-----
 .../common/actions/RemoteTaskActionClientTest.java | 27 ++++++++++++++++++++++
 .../SegmentTransactionalInsertActionTest.java      | 13 +++++++----
 .../concurrent/ConcurrentReplaceAndAppendTest.java |  4 ++--
 .../ConcurrentReplaceAndStreamingAppendTest.java   |  4 ++--
 .../org/apache/druid/error/DruidException.java     |  5 ++++
 .../apache/druid/error/DruidExceptionMatcher.java  |  9 ++++++++
 .../org/apache/druid/rpc/ServiceClientImpl.java    |  2 +-
 11 files changed, 68 insertions(+), 19 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 60c70fb27ff..7f8d0855284 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1422,7 +1422,7 @@ If the Peon is running in remote mode, there must be an 
Overlord up and running.
 |--------|-----------|-------|
 |`druid.peon.taskActionClient.retry.minWait`|The minimum retry time to 
communicate with Overlord.|`PT5S`|
 |`druid.peon.taskActionClient.retry.maxWait`|The maximum retry time to 
communicate with Overlord.|`PT1M`|
-|`druid.peon.taskActionClient.retry.maxRetryCount`|The maximum number of 
retries to communicate with Overlord.|60|
+|`druid.peon.taskActionClient.retry.maxRetryCount`|The maximum number of 
retries to communicate with Overlord.|13 (about 10 minutes of retrying)|
 
 ##### SegmentWriteOutMediumFactory
 
@@ -1475,7 +1475,7 @@ For most types of tasks, `SegmentWriteOutMediumFactory` 
can be configured per-ta
 |`druid.indexer.task.storeEmptyColumns`|Boolean value for whether or not to 
store empty columns during ingestion. When set to true, Druid stores every 
column specified in the 
[`dimensionsSpec`](../ingestion/ingestion-spec.md#dimensionsspec). <br/><br/>If 
you set `storeEmptyColumns` to false, Druid SQL queries referencing empty 
columns will fail. If you intend to leave `storeEmptyColumns` disabled, you 
should either ingest placeholder data for empty columns or else not query on 
empty colu [...]
 |`druid.peon.taskActionClient.retry.minWait`|The minimum retry time to 
communicate with Overlord.|`PT5S`|
 |`druid.peon.taskActionClient.retry.maxWait`|The maximum retry time to 
communicate with Overlord.|`PT1M`|
-|`druid.peon.taskActionClient.retry.maxRetryCount`|The maximum number of 
retries to communicate with Overlord.|60|
+|`druid.peon.taskActionClient.retry.maxRetryCount`|The maximum number of 
retries to communicate with Overlord.|13 (about 10 minutes of retrying)|
 
 #### Indexer concurrent requests
 
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/InsertLockPreemptedFaultTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/InsertLockPreemptedFaultTest.java
index 5da165dbc46..e8abb73e56e 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/InsertLockPreemptedFaultTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/InsertLockPreemptedFaultTest.java
@@ -66,7 +66,7 @@ public class InsertLockPreemptedFaultTest extends MSQTestBase
     {
       if (preempted) {
         throw new ISE(
-            "Segments[dummySegment] are not covered by locks[dummyLock] for 
task[dummyTask]"
+            "Segment IDs[dummySegment] are not covered by locks[dummyLock] for 
task[dummyTask]"
         );
       }
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/RetryPolicyConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/RetryPolicyConfig.java
index 22b03356256..b98475d7f5a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/RetryPolicyConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/RetryPolicyConfig.java
@@ -33,7 +33,7 @@ public class RetryPolicyConfig
   private Period maxWait = new Period("PT1M");
 
   @JsonProperty
-  private long maxRetryCount = 60;
+  private long maxRetryCount = 13;
 
   public Period getMinWait()
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
index b7c5d27f705..82c8990c748 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.common.actions;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.SegmentLock;
 import org.apache.druid.indexing.common.TaskLock;
@@ -57,12 +58,14 @@ public class TaskLocks
   )
   {
     if (!isLockCoversSegments(task, taskLockbox, segments)) {
-      throw new ISE(
-          "Segments[%s] are not covered by locks[%s] for task[%s]",
-          segments,
-          taskLockbox.findLocksForTask(task),
-          task.getId()
-      );
+      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                          .ofCategory(DruidException.Category.CONFLICT)
+                          .build(
+                              "Segment IDs[%s] are not covered by locks[%s] 
for task[%s]",
+                              
segments.stream().map(DataSegment::getId).collect(Collectors.toSet()),
+                              taskLockbox.findLocksForTask(task),
+                              task.getId()
+                          );
     }
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RemoteTaskActionClientTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RemoteTaskActionClientTest.java
index af3b9625e47..f6db5ea50d7 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RemoteTaskActionClientTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RemoteTaskActionClientTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.common.actions;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TimeChunkLock;
@@ -33,6 +34,8 @@ import 
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
 import org.apache.druid.rpc.HttpResponseException;
 import org.apache.druid.rpc.RequestBuilder;
 import org.apache.druid.rpc.ServiceClient;
+import org.apache.druid.rpc.ServiceClientImpl;
+import org.apache.druid.rpc.StandardRetryPolicy;
 import org.easymock.EasyMock;
 import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
 import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
@@ -148,4 +151,28 @@ public class RemoteTaskActionClientTest
 
     EasyMock.verify(directOverlordClient, response);
   }
+
+  @Test
+  public void 
test_defaultTaskActionRetryPolicy_hasMaxRetryDurationOf10Minutes()
+  {
+    final RetryPolicyConfig defaultRetryConfig = new RetryPolicyConfig();
+
+    // Build a policy with the default values for min and max wait
+    // Value of max attempts is irrelevant since we just want to compute back 
off times
+    final StandardRetryPolicy retryPolicy = StandardRetryPolicy
+        .builder()
+        .maxAttempts(1)
+        
.minWaitMillis(defaultRetryConfig.getMinWait().toStandardDuration().getMillis())
+        
.maxWaitMillis(defaultRetryConfig.getMaxWait().toStandardDuration().getMillis())
+        .build();
+
+    final long maxWait10Minutes = 10 * 60 * 1000;
+    long totalWaitTimeMillis = 0;
+    int attempt = 0;
+    for (; totalWaitTimeMillis < maxWait10Minutes; ++attempt) {
+      totalWaitTimeMillis += ServiceClientImpl.computeBackoffMs(retryPolicy, 
attempt);
+    }
+
+    Assert.assertEquals(13, defaultRetryConfig.getMaxRetryCount());
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
index 6858e1d7199..a64ca5a6b6b 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
@@ -22,6 +22,8 @@ package org.apache.druid.indexing.common.actions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
@@ -34,6 +36,7 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.assertj.core.api.Assertions;
+import org.hamcrest.MatcherAssert;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -217,10 +220,12 @@ public class SegmentTransactionalInsertActionTest
     actionTestKit.getTaskLockbox().add(task);
     acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);
 
-    IllegalStateException exception = Assert.assertThrows(
-        IllegalStateException.class,
-        () -> action.perform(task, actionTestKit.getTaskActionToolbox())
+    MatcherAssert.assertThat(
+        Assert.assertThrows(
+            DruidException.class,
+            () -> action.perform(task, actionTestKit.getTaskActionToolbox())
+        ),
+        DruidExceptionMatcher.conflict().expectMessageContains("are not 
covered by locks")
     );
-    Assert.assertTrue(exception.getMessage().contains("are not covered by 
locks"));
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index 04d4baf7097..76cbe210611 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -590,8 +590,8 @@ public class ConcurrentReplaceAndAppendTest extends 
IngestionTestBase
     final Throwable throwable = Throwables.getRootCause(exception);
     Assert.assertEquals(
         StringUtils.format(
-            "Segments[[%s]] are not covered by locks[[]] for task[%s]",
-            segmentV10, replaceTask.getId()
+            "Segment IDs[[%s]] are not covered by locks[[]] for task[%s]",
+            segmentV10.getId(), replaceTask.getId()
         ),
         throwable.getMessage()
     );
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
index 1b2fc1bbde0..8f374ff16b5 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
@@ -610,8 +610,8 @@ public class ConcurrentReplaceAndStreamingAppendTest 
extends IngestionTestBase
     final Throwable throwable = Throwables.getRootCause(exception);
     Assert.assertEquals(
         StringUtils.format(
-            "Segments[[%s]] are not covered by locks[[]] for task[%s]",
-            segmentV10, replaceTask.getId()
+            "Segment IDs[[%s]] are not covered by locks[[]] for task[%s]",
+            segmentV10.getId(), replaceTask.getId()
         ),
         throwable.getMessage()
     );
diff --git 
a/processing/src/main/java/org/apache/druid/error/DruidException.java 
b/processing/src/main/java/org/apache/druid/error/DruidException.java
index fcf55e40710..4883c811562 100644
--- a/processing/src/main/java/org/apache/druid/error/DruidException.java
+++ b/processing/src/main/java/org/apache/druid/error/DruidException.java
@@ -374,6 +374,11 @@ public class DruidException extends RuntimeException
      * Means that the requested resource cannot be found.
      */
     NOT_FOUND(404),
+    /**
+     * Indicates that the request could not be completed due to a conflict with
+     * the current state of the target resource.
+     */
+    CONFLICT(409),
     /**
      * Means that some capacity limit was exceeded, this could be due to 
throttling or due to some system limit
      */
diff --git 
a/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java 
b/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java
index cdaf88785ec..67e96b86d74 100644
--- a/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java
+++ b/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java
@@ -68,6 +68,15 @@ public class DruidExceptionMatcher extends 
DiagnosingMatcher<Throwable>
     return new DruidExceptionMatcher(DruidException.Persona.USER, 
DruidException.Category.FORBIDDEN, "general");
   }
 
+  public static DruidExceptionMatcher conflict()
+  {
+    return new DruidExceptionMatcher(
+        DruidException.Persona.OPERATOR,
+        DruidException.Category.CONFLICT,
+        "general"
+    );
+  }
+
   public static DruidExceptionMatcher defensive()
   {
     return new DruidExceptionMatcher(
diff --git a/server/src/main/java/org/apache/druid/rpc/ServiceClientImpl.java 
b/server/src/main/java/org/apache/druid/rpc/ServiceClientImpl.java
index a1f63c5c8b2..a75a8f6a77d 100644
--- a/server/src/main/java/org/apache/druid/rpc/ServiceClientImpl.java
+++ b/server/src/main/java/org/apache/druid/rpc/ServiceClientImpl.java
@@ -487,7 +487,7 @@ public class ServiceClientImpl implements ServiceClient
   }
 
   @VisibleForTesting
-  static long computeBackoffMs(final ServiceRetryPolicy retryPolicy, final 
long attemptNumber)
+  public static long computeBackoffMs(final ServiceRetryPolicy retryPolicy, 
final long attemptNumber)
   {
     return Math.max(
         retryPolicy.minWaitMillis(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to