This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 186b5bc8d39 Do not retry task actions when a lock is revoked (#18182)
186b5bc8d39 is described below
commit 186b5bc8d39a49c7e77a5d8e906cb373b819b888
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Jul 1 17:26:52 2025 +0530
Do not retry task actions when a lock is revoked (#18182)
Changes:
- Throw a DruidException of forbidden category if task locks are revoked so
that the `ServiceClient`
does not retry these errors.
- Reduce the default number of retries in the remote task action client to
13 (about 10 minutes worth of retries).
---
docs/configuration/index.md | 4 ++--
.../error/InsertLockPreemptedFaultTest.java | 2 +-
.../druid/indexing/common/RetryPolicyConfig.java | 2 +-
.../druid/indexing/common/actions/TaskLocks.java | 15 +++++++-----
.../common/actions/RemoteTaskActionClientTest.java | 27 ++++++++++++++++++++++
.../SegmentTransactionalInsertActionTest.java | 13 +++++++----
.../concurrent/ConcurrentReplaceAndAppendTest.java | 4 ++--
.../ConcurrentReplaceAndStreamingAppendTest.java | 4 ++--
.../org/apache/druid/error/DruidException.java | 5 ++++
.../apache/druid/error/DruidExceptionMatcher.java | 9 ++++++++
.../org/apache/druid/rpc/ServiceClientImpl.java | 2 +-
11 files changed, 68 insertions(+), 19 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 60c70fb27ff..7f8d0855284 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1422,7 +1422,7 @@ If the Peon is running in remote mode, there must be an
Overlord up and running.
|--------|-----------|-------|
|`druid.peon.taskActionClient.retry.minWait`|The minimum retry time to
communicate with Overlord.|`PT5S`|
|`druid.peon.taskActionClient.retry.maxWait`|The maximum retry time to
communicate with Overlord.|`PT1M`|
-|`druid.peon.taskActionClient.retry.maxRetryCount`|The maximum number of
retries to communicate with Overlord.|60|
+|`druid.peon.taskActionClient.retry.maxRetryCount`|The maximum number of
retries to communicate with Overlord.|13 (about 10 minutes of retrying)|
##### SegmentWriteOutMediumFactory
@@ -1475,7 +1475,7 @@ For most types of tasks, `SegmentWriteOutMediumFactory`
can be configured per-ta
|`druid.indexer.task.storeEmptyColumns`|Boolean value for whether or not to
store empty columns during ingestion. When set to true, Druid stores every
column specified in the
[`dimensionsSpec`](../ingestion/ingestion-spec.md#dimensionsspec). <br/><br/>If
you set `storeEmptyColumns` to false, Druid SQL queries referencing empty
columns will fail. If you intend to leave `storeEmptyColumns` disabled, you
should either ingest placeholder data for empty columns or else not query on
empty colu [...]
|`druid.peon.taskActionClient.retry.minWait`|The minimum retry time to
communicate with Overlord.|`PT5S`|
|`druid.peon.taskActionClient.retry.maxWait`|The maximum retry time to
communicate with Overlord.|`PT1M`|
-|`druid.peon.taskActionClient.retry.maxRetryCount`|The maximum number of
retries to communicate with Overlord.|60|
+|`druid.peon.taskActionClient.retry.maxRetryCount`|The maximum number of
retries to communicate with Overlord.|13 (about 10 minutes of retrying)|
#### Indexer concurrent requests
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/InsertLockPreemptedFaultTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/InsertLockPreemptedFaultTest.java
index 5da165dbc46..e8abb73e56e 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/InsertLockPreemptedFaultTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/InsertLockPreemptedFaultTest.java
@@ -66,7 +66,7 @@ public class InsertLockPreemptedFaultTest extends MSQTestBase
{
if (preempted) {
throw new ISE(
- "Segments[dummySegment] are not covered by locks[dummyLock] for
task[dummyTask]"
+ "Segment IDs[dummySegment] are not covered by locks[dummyLock] for
task[dummyTask]"
);
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/RetryPolicyConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/RetryPolicyConfig.java
index 22b03356256..b98475d7f5a 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/RetryPolicyConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/RetryPolicyConfig.java
@@ -33,7 +33,7 @@ public class RetryPolicyConfig
private Period maxWait = new Period("PT1M");
@JsonProperty
- private long maxRetryCount = 60;
+ private long maxRetryCount = 13;
public Period getMinWait()
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
index b7c5d27f705..82c8990c748 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.actions;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentLock;
import org.apache.druid.indexing.common.TaskLock;
@@ -57,12 +58,14 @@ public class TaskLocks
)
{
if (!isLockCoversSegments(task, taskLockbox, segments)) {
- throw new ISE(
- "Segments[%s] are not covered by locks[%s] for task[%s]",
- segments,
- taskLockbox.findLocksForTask(task),
- task.getId()
- );
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.CONFLICT)
+ .build(
+ "Segment IDs[%s] are not covered by locks[%s]
for task[%s]",
+
segments.stream().map(DataSegment::getId).collect(Collectors.toSet()),
+ taskLockbox.findLocksForTask(task),
+ task.getId()
+ );
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RemoteTaskActionClientTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RemoteTaskActionClientTest.java
index af3b9625e47..f6db5ea50d7 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RemoteTaskActionClientTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RemoteTaskActionClientTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TimeChunkLock;
@@ -33,6 +34,8 @@ import
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
+import org.apache.druid.rpc.ServiceClientImpl;
+import org.apache.druid.rpc.StandardRetryPolicy;
import org.easymock.EasyMock;
import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
@@ -148,4 +151,28 @@ public class RemoteTaskActionClientTest
EasyMock.verify(directOverlordClient, response);
}
+
+ @Test
+ public void
test_defaultTaskActionRetryPolicy_hasMaxRetryDurationOf10Minutes()
+ {
+ final RetryPolicyConfig defaultRetryConfig = new RetryPolicyConfig();
+
+ // Build a policy with the default values for min and max wait
+ // Value of max attempts is irrelevant since we just want to compute back
off times
+ final StandardRetryPolicy retryPolicy = StandardRetryPolicy
+ .builder()
+ .maxAttempts(1)
+
.minWaitMillis(defaultRetryConfig.getMinWait().toStandardDuration().getMillis())
+
.maxWaitMillis(defaultRetryConfig.getMaxWait().toStandardDuration().getMillis())
+ .build();
+
+ final long maxWait10Minutes = 10 * 60 * 1000;
+ long totalWaitTimeMillis = 0;
+ int attempt = 0;
+ for (; totalWaitTimeMillis < maxWait10Minutes; ++attempt) {
+ totalWaitTimeMillis += ServiceClientImpl.computeBackoffMs(retryPolicy,
attempt);
+ }
+
+ Assert.assertEquals(13, defaultRetryConfig.getMaxRetryCount());
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
index 6858e1d7199..a64ca5a6b6b 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
@@ -22,6 +22,8 @@ package org.apache.druid.indexing.common.actions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
@@ -34,6 +36,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.assertj.core.api.Assertions;
+import org.hamcrest.MatcherAssert;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
@@ -217,10 +220,12 @@ public class SegmentTransactionalInsertActionTest
actionTestKit.getTaskLockbox().add(task);
acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);
- IllegalStateException exception = Assert.assertThrows(
- IllegalStateException.class,
- () -> action.perform(task, actionTestKit.getTaskActionToolbox())
+ MatcherAssert.assertThat(
+ Assert.assertThrows(
+ DruidException.class,
+ () -> action.perform(task, actionTestKit.getTaskActionToolbox())
+ ),
+ DruidExceptionMatcher.conflict().expectMessageContains("are not
covered by locks")
);
- Assert.assertTrue(exception.getMessage().contains("are not covered by
locks"));
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index 04d4baf7097..76cbe210611 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -590,8 +590,8 @@ public class ConcurrentReplaceAndAppendTest extends
IngestionTestBase
final Throwable throwable = Throwables.getRootCause(exception);
Assert.assertEquals(
StringUtils.format(
- "Segments[[%s]] are not covered by locks[[]] for task[%s]",
- segmentV10, replaceTask.getId()
+ "Segment IDs[[%s]] are not covered by locks[[]] for task[%s]",
+ segmentV10.getId(), replaceTask.getId()
),
throwable.getMessage()
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
index 1b2fc1bbde0..8f374ff16b5 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
@@ -610,8 +610,8 @@ public class ConcurrentReplaceAndStreamingAppendTest
extends IngestionTestBase
final Throwable throwable = Throwables.getRootCause(exception);
Assert.assertEquals(
StringUtils.format(
- "Segments[[%s]] are not covered by locks[[]] for task[%s]",
- segmentV10, replaceTask.getId()
+ "Segment IDs[[%s]] are not covered by locks[[]] for task[%s]",
+ segmentV10.getId(), replaceTask.getId()
),
throwable.getMessage()
);
diff --git
a/processing/src/main/java/org/apache/druid/error/DruidException.java
b/processing/src/main/java/org/apache/druid/error/DruidException.java
index fcf55e40710..4883c811562 100644
--- a/processing/src/main/java/org/apache/druid/error/DruidException.java
+++ b/processing/src/main/java/org/apache/druid/error/DruidException.java
@@ -374,6 +374,11 @@ public class DruidException extends RuntimeException
* Means that the requested resource cannot be found.
*/
NOT_FOUND(404),
+ /**
+ * Indicates that the request could not be completed due to a conflict with
+ * the current state of the target resource.
+ */
+ CONFLICT(409),
/**
* Means that some capacity limit was exceeded, this could be due to
throttling or due to some system limit
*/
diff --git
a/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java
b/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java
index cdaf88785ec..67e96b86d74 100644
--- a/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java
+++ b/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java
@@ -68,6 +68,15 @@ public class DruidExceptionMatcher extends
DiagnosingMatcher<Throwable>
return new DruidExceptionMatcher(DruidException.Persona.USER,
DruidException.Category.FORBIDDEN, "general");
}
+ public static DruidExceptionMatcher conflict()
+ {
+ return new DruidExceptionMatcher(
+ DruidException.Persona.OPERATOR,
+ DruidException.Category.CONFLICT,
+ "general"
+ );
+ }
+
public static DruidExceptionMatcher defensive()
{
return new DruidExceptionMatcher(
diff --git a/server/src/main/java/org/apache/druid/rpc/ServiceClientImpl.java
b/server/src/main/java/org/apache/druid/rpc/ServiceClientImpl.java
index a1f63c5c8b2..a75a8f6a77d 100644
--- a/server/src/main/java/org/apache/druid/rpc/ServiceClientImpl.java
+++ b/server/src/main/java/org/apache/druid/rpc/ServiceClientImpl.java
@@ -487,7 +487,7 @@ public class ServiceClientImpl implements ServiceClient
}
@VisibleForTesting
- static long computeBackoffMs(final ServiceRetryPolicy retryPolicy, final
long attemptNumber)
+ public static long computeBackoffMs(final ServiceRetryPolicy retryPolicy,
final long attemptNumber)
{
return Math.max(
retryPolicy.minWaitMillis(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]