This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new b46be6eb22d Retry segment publish task actions without holding locks
(#17816)
b46be6eb22d is described below
commit b46be6eb22dfa9ee671b07e279744d9b6a2eb7bd
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Mar 24 14:04:13 2025 +0530
Retry segment publish task actions without holding locks (#17816)
#17802 reverted a retry of failed segment publish actions.
This patch attempts to address the original issue by retrying the segment
publish task actions
on the client (i.e. task) side without holding any locks so that other
transactions are not blocked.
Changes
Add retries to TransactionalSegmentPublisher
Add field retryable to SegmentPublishResult
Remove class DataStoreMetadataUpdateResult and use SegmentPublishResult
instead
---
.../common/task/AbstractBatchIndexTask.java | 25 ++++
.../druid/indexing/common/task/IndexTask.java | 7 +-
.../parallel/ParallelIndexSupervisorTask.java | 8 +-
.../indexing/seekablestream/SequenceMetadata.java | 2 +-
.../SegmentTransactionalInsertActionTest.java | 9 +-
.../indexing/overlord/SegmentPublishResult.java | 50 ++++---
.../IndexerSQLMetadataStorageCoordinator.java | 158 ++++-----------------
.../appenderator/BaseAppenderatorDriver.java | 3 +-
.../TransactionalSegmentPublisher.java | 61 +++++++-
.../IndexerSQLMetadataStorageCoordinatorTest.java | 49 ++++---
...ataStorageCoordinatorSchemaPersistenceTest.java | 2 +-
.../appenderator/BatchAppenderatorDriverTest.java | 18 ++-
.../appenderator/StreamAppenderatorDriverTest.java | 34 ++++-
.../TransactionalSegmentPublisherTest.java | 104 ++++++++++++++
14 files changed, 329 insertions(+), 201 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 38bc961dc47..ec9c12a14fe 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -76,6 +76,7 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.IngestionSpec;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.transform.CompactionTransformSpec;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
@@ -456,6 +457,30 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
}
}
+ protected TransactionalSegmentPublisher buildSegmentPublisher(TaskToolbox
toolbox)
+ {
+ return new TransactionalSegmentPublisher()
+ {
+ @Override
+ public SegmentPublishResult publishAnnotatedSegments(
+ @Nullable Set<DataSegment> segmentsToBeOverwritten,
+ Set<DataSegment> segmentsToPublish,
+ @Nullable Object commitMetadata,
+ @Nullable SegmentSchemaMapping schemaMapping
+ ) throws IOException
+ {
+ return toolbox.getTaskActionClient().submit(
+ buildPublishAction(
+ segmentsToBeOverwritten,
+ segmentsToPublish,
+ schemaMapping,
+ getTaskLockHelper().getLockTypeToUse()
+ )
+ );
+ }
+ };
+ }
+
protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval>
intervals) throws IOException
{
// The given intervals are first converted to align with segment
granularity. This is because,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index eae1f7caf1e..4ea7e9d3dee 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -48,7 +48,6 @@ import
org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SecondaryPartitionType;
import org.apache.druid.indexer.report.TaskReport;
-import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -866,11 +865,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler, Pe
throw new UOE("[%s] secondary partition type is not supported",
partitionsSpec.getType());
}
- final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse();
- final TransactionalSegmentPublisher publisher =
- (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, map) ->
toolbox.getTaskActionClient().submit(
- buildPublishAction(segmentsToBeOverwritten, segmentsToPublish,
map, taskLockType)
- );
+ final TransactionalSegmentPublisher publisher =
buildSegmentPublisher(toolbox);
String effectiveId =
getContextValue(CompactionTask.CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, null);
if (effectiveId == null) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 86e31c74a72..34cc71a3a12 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -45,7 +45,6 @@ import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.TaskReport;
-import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
@@ -1191,12 +1190,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask
}
}
- final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse();
- final TransactionalSegmentPublisher publisher =
- (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, map) ->
toolbox.getTaskActionClient().submit(
- buildPublishAction(segmentsToBeOverwritten, segmentsToPublish,
map, taskLockType)
- );
-
+ final TransactionalSegmentPublisher publisher =
buildSegmentPublisher(toolbox);
final boolean published =
newSegments.isEmpty()
|| publisher.publishSegments(oldSegments, newSegments,
annotateFunction, null, segmentSchemaMapping).isSuccess();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
index 2da858f80cc..f974a1c6c93 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
@@ -331,7 +331,7 @@ public class SequenceMetadata<PartitionIdType,
SequenceOffsetType>
}
private class SequenceMetadataTransactionalSegmentPublisher
- implements TransactionalSegmentPublisher
+ extends TransactionalSegmentPublisher
{
private final SeekableStreamIndexTaskRunner<PartitionIdType,
SequenceOffsetType, ?> runner;
private final TaskToolbox toolbox;
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
index 44ce60b5ceb..095e9c3b57d 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.common.actions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
@@ -150,11 +149,9 @@ public class SegmentTransactionalInsertActionTest
);
Assert.assertEquals(
- SegmentPublishResult.fail(
- InvalidInput.exception(
- "The new start metadata state[ObjectMetadata{theObject=[1]}]
is"
- + " ahead of the last committed end state[null]. Try resetting
the supervisor."
- ).toString()
+ SegmentPublishResult.retryableFailure(
+ "The new start metadata state[ObjectMetadata{theObject=[1]}] is"
+ + " ahead of the last committed end state[null]. Try resetting the
supervisor."
),
result
);
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java
b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java
index e4bc1645f71..04b745c812e 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java
@@ -22,7 +22,7 @@ package org.apache.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
@@ -34,59 +34,59 @@ import java.util.Objects;
import java.util.Set;
/**
- * Result of an operation that attempts to publish segments. Indicates the set
of segments actually published
- * and whether or not the transaction was a success.
- *
- * If "success" is false then the segments set will be empty.
- *
- * It's possible for the segments set to be empty even if "success" is true,
since the segments set only
- * includes segments actually published as part of the transaction. The
requested segments could have been
- * published by a different transaction (e.g. in the case of replica sets) and
this one would still succeed.
+ * Result of a segment publish operation.
*/
public class SegmentPublishResult
{
private final Set<DataSegment> segments;
private final boolean success;
- @Nullable
+ private final boolean retryable;
private final String errorMsg;
- @Nullable
private final List<PendingSegmentRecord> upgradedPendingSegments;
public static SegmentPublishResult ok(Set<DataSegment> segments)
{
- return new SegmentPublishResult(segments, true, null);
+ return new SegmentPublishResult(segments, true, false, null);
}
public static SegmentPublishResult ok(Set<DataSegment> segments,
List<PendingSegmentRecord> upgradedPendingSegments)
{
- return new SegmentPublishResult(segments, true, null,
upgradedPendingSegments);
+ return new SegmentPublishResult(segments, true, false, null,
upgradedPendingSegments);
}
- public static SegmentPublishResult fail(String errorMsg)
+ public static SegmentPublishResult fail(String errorMsg, Object... args)
{
- return new SegmentPublishResult(ImmutableSet.of(), false, errorMsg);
+ return new SegmentPublishResult(Set.of(), false, false,
StringUtils.format(errorMsg, args), null);
+ }
+
+ public static SegmentPublishResult retryableFailure(String errorMsg,
Object... args)
+ {
+ return new SegmentPublishResult(Set.of(), false, true,
StringUtils.format(errorMsg, args), null);
}
@JsonCreator
private SegmentPublishResult(
@JsonProperty("segments") Set<DataSegment> segments,
@JsonProperty("success") boolean success,
+ @JsonProperty("retryable") boolean retryable,
@JsonProperty("errorMsg") @Nullable String errorMsg
)
{
- this(segments, success, errorMsg, null);
+ this(segments, success, retryable, errorMsg, null);
}
private SegmentPublishResult(
Set<DataSegment> segments,
boolean success,
- @Nullable String errorMsg,
+ boolean retryable,
+ @Nullable String errorMsg,
List<PendingSegmentRecord> upgradedPendingSegments
)
{
this.segments = Preconditions.checkNotNull(segments, "segments");
this.success = success;
this.errorMsg = errorMsg;
+ this.retryable = retryable;
this.upgradedPendingSegments = upgradedPendingSegments;
if (!success) {
@@ -98,6 +98,12 @@ public class SegmentPublishResult
}
}
+ /**
+ * Set of segments published successfully.
+ *
+ * @return Empty set if the publish operation failed or if all the segments
had
+ * already been published by a different transaction.
+ */
@JsonProperty
public Set<DataSegment> getSegments()
{
@@ -117,6 +123,12 @@ public class SegmentPublishResult
return errorMsg;
}
+ @JsonProperty
+ public boolean isRetryable()
+ {
+ return retryable;
+ }
+
@Nullable
public List<PendingSegmentRecord> getUpgradedPendingSegments()
{
@@ -134,6 +146,7 @@ public class SegmentPublishResult
}
SegmentPublishResult that = (SegmentPublishResult) o;
return success == that.success &&
+ retryable == that.retryable &&
Objects.equals(segments, that.segments) &&
Objects.equals(errorMsg, that.errorMsg);
}
@@ -141,7 +154,7 @@ public class SegmentPublishResult
@Override
public int hashCode()
{
- return Objects.hash(segments, success, errorMsg);
+ return Objects.hash(segments, success, errorMsg, retryable);
}
@Override
@@ -150,6 +163,7 @@ public class SegmentPublishResult
return "SegmentPublishResult{" +
"segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
", success=" + success +
+ ", retryable=" + retryable +
", errorMsg='" + errorMsg + '\'' +
'}';
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index ce092f8c696..9a0f91d6afd 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -91,7 +91,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -306,33 +305,22 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
final String dataSource = segments.iterator().next().getDataSource();
- final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);
-
try {
return inReadWriteDatasourceTransaction(
dataSource,
transaction -> {
- // Set definitelyNotUpdated back to false upon retrying.
- definitelyNotUpdated.set(false);
-
+ // Try to update datasource metadata first
if (startMetadata != null) {
- final DataStoreMetadataUpdateResult result =
updateDataSourceMetadataWithHandle(
+ final SegmentPublishResult result =
updateDataSourceMetadataWithHandle(
transaction,
dataSource,
startMetadata,
endMetadata
);
- if (result.isFailed()) {
- // Metadata was definitely not updated.
- transaction.setRollbackOnly();
- definitelyNotUpdated.set(true);
-
- if (result.canRetry()) {
- throw new RetryTransactionException(result.getErrorMsg());
- } else {
- throw InvalidInput.exception(result.getErrorMsg());
- }
+ // Do not proceed if the datasource metadata update failed
+ if (!result.isSuccess()) {
+ return result;
}
}
@@ -347,12 +335,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
);
}
catch (CallbackFailedException e) {
- if (definitelyNotUpdated.get()) {
- return SegmentPublishResult.fail(e.getMessage());
- } else {
- // Must throw exception if we are not sure if we updated or not.
- throw e;
- }
+ throw e;
}
}
@@ -468,45 +451,19 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
throw new IllegalArgumentException("end metadata cannot be null");
}
- final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);
-
try {
return inReadWriteDatasourceTransaction(
dataSource,
- transaction -> {
- // Set definitelyNotUpdated back to false upon retrying.
- definitelyNotUpdated.set(false);
-
- final DataStoreMetadataUpdateResult result =
updateDataSourceMetadataWithHandle(
- transaction,
- dataSource,
- startMetadata,
- endMetadata
- );
-
- if (result.isFailed()) {
- // Metadata was definitely not updated.
- transaction.setRollbackOnly();
- definitelyNotUpdated.set(true);
-
- if (result.canRetry()) {
- throw new RetryTransactionException(result.getErrorMsg());
- } else {
- throw new RuntimeException(result.getErrorMsg());
- }
- }
-
- return SegmentPublishResult.ok(ImmutableSet.of());
- }
+ transaction -> updateDataSourceMetadataWithHandle(
+ transaction,
+ dataSource,
+ startMetadata,
+ endMetadata
+ )
);
}
catch (CallbackFailedException e) {
- if (definitelyNotUpdated.get()) {
- return SegmentPublishResult.fail(e.getMessage());
- } else {
- // Must throw exception if we are not sure if we updated or not.
- throw e;
- }
+ throw e;
}
}
@@ -1126,25 +1083,18 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
);
- final AtomicBoolean metadataNotUpdated = new AtomicBoolean(false);
try {
return inReadWriteDatasourceTransaction(
dataSource,
transaction -> {
- metadataNotUpdated.set(false);
-
+ // Try to update datasource metadata first
if (startMetadata != null) {
- final DataStoreMetadataUpdateResult metadataUpdateResult
+ final SegmentPublishResult metadataUpdateResult
= updateDataSourceMetadataWithHandle(transaction,
dataSource, startMetadata, endMetadata);
- if (metadataUpdateResult.isFailed()) {
- transaction.setRollbackOnly();
- metadataNotUpdated.set(true);
- if (metadataUpdateResult.canRetry()) {
- throw new
RetryTransactionException(metadataUpdateResult.getErrorMsg());
- } else {
- throw new
RuntimeException(metadataUpdateResult.getErrorMsg());
- }
+ // Abort the transaction if datasource metadata update has failed
+ if (!metadataUpdateResult.isSuccess()) {
+ return metadataUpdateResult;
}
}
@@ -1172,12 +1122,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
);
}
catch (CallbackFailedException e) {
- if (metadataNotUpdated.get()) {
- // Return failed result if metadata was definitely not updated
- return SegmentPublishResult.fail(e.getMessage());
- } else {
- throw e;
- }
+ throw e;
}
}
@@ -2052,7 +1997,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
*
* @throws RuntimeException if state is unknown after this call
*/
- protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle(
+ protected SegmentPublishResult updateDataSourceMetadataWithHandle(
final SegmentMetadataTransaction transaction,
final String dataSource,
final DataSourceMetadata startMetadata,
@@ -2102,7 +2047,9 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) {
// Offsets stored in startMetadata is greater than the last commited
metadata.
- return DataStoreMetadataUpdateResult.failure(
+ // This can happen because the previous task is still publishing its
segments and can resolve once
+ // the previous task finishes publishing.
+ return SegmentPublishResult.retryableFailure(
"The new start metadata state[%s] is ahead of the last committed"
+ " end state[%s]. Try resetting the supervisor.",
startMetadata, oldCommitMetadataFromDb
@@ -2111,7 +2058,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
if (!startMetadataMatchesExisting) {
// Not in the desired start state.
- return DataStoreMetadataUpdateResult.failure(
+ return SegmentPublishResult.fail(
"Inconsistency between stored metadata state[%s] and target
state[%s]. Try resetting the supervisor.",
oldCommitMetadataFromDb, startMetadata
);
@@ -2126,7 +2073,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes()
);
- final DataStoreMetadataUpdateResult retVal;
+ final SegmentPublishResult retVal;
if (oldCommitMetadataBytesFromDb == null) {
// SELECT -> INSERT can fail due to races; callers must be prepared to
retry.
final int numRows = transaction.getHandle().createStatement(
@@ -2143,8 +2090,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
.execute();
retVal = numRows == 1
- ? DataStoreMetadataUpdateResult.SUCCESS
- : DataStoreMetadataUpdateResult.retryableFailure("Failed to insert
metadata for datasource[%s]", dataSource);
+ ? SegmentPublishResult.ok(Set.of())
+ : SegmentPublishResult.retryableFailure("Failed to insert metadata
for datasource[%s]", dataSource);
} else {
// Expecting a particular old metadata; use the SHA1 in a
compare-and-swap UPDATE
final int numRows = transaction.getHandle().createStatement(
@@ -2163,8 +2110,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
.execute();
retVal = numRows == 1
- ? DataStoreMetadataUpdateResult.SUCCESS
- : DataStoreMetadataUpdateResult.retryableFailure("Failed to update
metadata for datasource[%s]", dataSource);
+ ? SegmentPublishResult.ok(Set.of())
+ : SegmentPublishResult.retryableFailure("Failed to update metadata
for datasource[%s]", dataSource);
}
if (retVal.isSuccess()) {
@@ -2521,51 +2468,4 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
{
return transactionFactory.inReadOnlyDatasourceTransaction(dataSource,
callback);
}
-
- public static class DataStoreMetadataUpdateResult
- {
- private final boolean failed;
- private final boolean canRetry;
- private final String errorMsg;
-
- public static final DataStoreMetadataUpdateResult SUCCESS = new
DataStoreMetadataUpdateResult(false, false, null);
-
- public static DataStoreMetadataUpdateResult failure(String errorMsgFormat,
Object... messageArgs)
- {
- return new DataStoreMetadataUpdateResult(true, false, errorMsgFormat,
messageArgs);
- }
-
- public static DataStoreMetadataUpdateResult retryableFailure(String
errorMsgFormat, Object... messageArgs)
- {
- return new DataStoreMetadataUpdateResult(true, true, errorMsgFormat,
messageArgs);
- }
-
- DataStoreMetadataUpdateResult(boolean failed, boolean canRetry, @Nullable
String errorMsg, Object... errorFormatArgs)
- {
- this.failed = failed;
- this.canRetry = canRetry;
- this.errorMsg = null == errorMsg ? null : StringUtils.format(errorMsg,
errorFormatArgs);
- }
-
- public boolean isFailed()
- {
- return failed;
- }
-
- public boolean isSuccess()
- {
- return !failed;
- }
-
- public boolean canRetry()
- {
- return canRetry;
- }
-
- @Nullable
- public String getErrorMsg()
- {
- return errorMsg;
- }
- }
}
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index fa7d037c92c..7ca7fd74c2d 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -719,7 +719,8 @@ public abstract class BaseAppenderatorDriver implements
Closeable
e -> (e != null && e.getMessage() != null
&& e.getMessage().contains("Failed to update the metadata
Store."
+ " The new start metadata is
ahead of last commited end state.")),
- RetryUtils.DEFAULT_MAX_TRIES
+ // Do not retry here since the TransactionalSegmentPublisher
itself performs required retries
+ 1
);
}
catch (Exception e) {
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
index 390f423fdb5..eb84b50399a 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
@@ -20,6 +20,9 @@
package org.apache.druid.segment.realtime.appenderator;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.timeline.DataSegment;
@@ -28,8 +31,11 @@ import java.io.IOException;
import java.util.Set;
import java.util.function.Function;
-public interface TransactionalSegmentPublisher
+public abstract class TransactionalSegmentPublisher
{
+ private static final int QUIET_RETRIES = 3;
+ private static final int MAX_RETRIES = 5;
+
/**
* Publish segments, along with some commit metadata, in a single
transaction.
*
@@ -40,14 +46,19 @@ public interface TransactionalSegmentPublisher
* @throws IOException if there was an I/O error when publishing
* @throws RuntimeException if we cannot tell if the segments were published
or not, for some other reason
*/
- SegmentPublishResult publishAnnotatedSegments(
+ public abstract SegmentPublishResult publishAnnotatedSegments(
@Nullable Set<DataSegment> segmentsToBeOverwritten,
Set<DataSegment> segmentsToPublish,
@Nullable Object commitMetadata,
@Nullable SegmentSchemaMapping segmentSchemaMapping
) throws IOException;
- default SegmentPublishResult publishSegments(
+ /**
+ * Applies the given annotate function on the segments and tries to publish
+ * them. If the action fails with a retryable failure, it can be retried upto
+ * {@link #MAX_RETRIES} times.
+ */
+ public final SegmentPublishResult publishSegments(
@Nullable Set<DataSegment> segmentsToBeOverwritten,
Set<DataSegment> segmentsToPublish,
Function<Set<DataSegment>, Set<DataSegment>>
outputSegmentsAnnotateFunction,
@@ -57,20 +68,58 @@ public interface TransactionalSegmentPublisher
{
final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction =
outputSegmentsAnnotateFunction
.andThen(SegmentPublisherHelper::annotateShardSpec);
- return publishAnnotatedSegments(
+ final Set<DataSegment> annotatedSegmentsToPublish =
annotateFunction.apply(segmentsToPublish);
+
+ int attemptCount = 0;
+
+ // Retry until success or until max retries are exhausted
+ SegmentPublishResult result = publishAnnotatedSegments(
segmentsToBeOverwritten,
- annotateFunction.apply(segmentsToPublish),
+ annotatedSegmentsToPublish,
commitMetadata,
segmentSchemaMapping
);
+ while (!result.isSuccess() && result.isRetryable() && attemptCount++ <
MAX_RETRIES) {
+ awaitNextRetry(result, attemptCount);
+ result = publishAnnotatedSegments(
+ segmentsToBeOverwritten,
+ annotatedSegmentsToPublish,
+ commitMetadata,
+ segmentSchemaMapping
+ );
+ }
+
+ return result;
}
/**
* @return true if this publisher has action to take when publishing with an
empty segment set.
* The publisher used by the seekable stream tasks is an example
where this is true.
*/
- default boolean supportsEmptyPublish()
+ public boolean supportsEmptyPublish()
{
return false;
}
+
+ /**
+ * Sleeps until the next attempt.
+ */
+ private static void awaitNextRetry(SegmentPublishResult lastResult, int
attemptCount)
+ {
+ try {
+ RetryUtils.awaitNextRetry(
+ new ISE(lastResult.getErrorMsg()),
+ StringUtils.format(
+ "Segment publish failed due to error[%s]",
+ lastResult.getErrorMsg()
+ ),
+ attemptCount,
+ MAX_RETRIES,
+ attemptCount <= QUIET_RETRIES
+ );
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index b6578fa9a72..0212c8f2d99 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.StringTuple;
-import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.ObjectMetadata;
import org.apache.druid.indexing.overlord.SegmentCreateRequest;
@@ -186,7 +185,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
)
{
@Override
- protected DataStoreMetadataUpdateResult
updateDataSourceMetadataWithHandle(
+ protected SegmentPublishResult updateDataSourceMetadataWithHandle(
SegmentMetadataTransaction transaction,
String dataSource,
DataSourceMetadata startMetadata,
@@ -780,7 +779,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
)
{
@Override
- protected DataStoreMetadataUpdateResult
updateDataSourceMetadataWithHandle(
+ protected SegmentPublishResult updateDataSourceMetadataWithHandle(
SegmentMetadataTransaction transaction,
String dataSource,
DataSourceMetadata startMetadata,
@@ -789,7 +788,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
{
metadataUpdateCounter.getAndIncrement();
if (attemptCounter.getAndIncrement() == 0) {
- return DataStoreMetadataUpdateResult.retryableFailure(null);
+ return SegmentPublishResult.retryableFailure("this failure can be
retried");
} else {
return super.updateDataSourceMetadataWithHandle(transaction,
dataSource, startMetadata, endMetadata);
}
@@ -803,7 +802,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
new ObjectMetadata(ImmutableMap.of("foo", "bar")),
new
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)
);
-
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)),
result1);
+ Assert.assertEquals(SegmentPublishResult.retryableFailure("this failure
can be retried"), result1);
+
+ final SegmentPublishResult resultOnRetry =
failOnceCoordinator.commitSegmentsAndMetadata(
+ ImmutableSet.of(defaultSegment),
+ new ObjectMetadata(null),
+ new ObjectMetadata(ImmutableMap.of("foo", "bar")),
+ new
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)
+ );
+
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)),
resultOnRetry);
Assert.assertArrayEquals(
mapper.writeValueAsString(defaultSegment).getBytes(StandardCharsets.UTF_8),
@@ -825,7 +832,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
new ObjectMetadata(ImmutableMap.of("foo", "baz")),
new
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)
);
-
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment2)),
result2);
+ Assert.assertEquals(SegmentPublishResult.retryableFailure("this failure
can be retried"), result2);
+
+ final SegmentPublishResult resultOnRetry2 =
failOnceCoordinator.commitSegmentsAndMetadata(
+ ImmutableSet.of(defaultSegment2),
+ new ObjectMetadata(ImmutableMap.of("foo", "bar")),
+ new ObjectMetadata(ImmutableMap.of("foo", "baz")),
+ new
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)
+ );
+
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment2)),
resultOnRetry2);
Assert.assertArrayEquals(
mapper.writeValueAsString(defaultSegment2).getBytes(StandardCharsets.UTF_8),
@@ -857,11 +872,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
new
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)
);
Assert.assertEquals(
- SegmentPublishResult.fail(
- InvalidInput.exception(
- "The new start metadata
state[ObjectMetadata{theObject={foo=bar}}] is ahead of the last committed"
- + " end state[null]. Try resetting the supervisor."
- ).toString()),
+ SegmentPublishResult.retryableFailure(
+ "The new start metadata state[ObjectMetadata{theObject={foo=bar}}]
is ahead of the last committed"
+ + " end state[null]. Try resetting the supervisor."
+ ),
result1
);
@@ -888,10 +902,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
);
Assert.assertEquals(
SegmentPublishResult.fail(
- InvalidInput.exception(
- "Inconsistency between stored metadata
state[ObjectMetadata{theObject={foo=baz}}]"
- + " and target state[ObjectMetadata{theObject=null}]. Try
resetting the supervisor."
- ).toString()
+ "Inconsistency between stored metadata
state[ObjectMetadata{theObject={foo=baz}}]"
+ + " and target state[ObjectMetadata{theObject=null}]. Try
resetting the supervisor."
),
result2
);
@@ -972,10 +984,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
);
Assert.assertEquals(
SegmentPublishResult.fail(
- InvalidInput.exception(
- "Inconsistency between stored metadata
state[ObjectMetadata{theObject={foo=baz}}] and "
- + "target state[ObjectMetadata{theObject={foo=qux}}]. Try
resetting the supervisor."
- ).toString()),
+ "Inconsistency between stored metadata
state[ObjectMetadata{theObject={foo=baz}}] and "
+ + "target state[ObjectMetadata{theObject={foo=qux}}]. Try
resetting the supervisor."
+ ),
result2
);
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
index 9bed46b2c3a..c72e1a75afe 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
@@ -108,7 +108,7 @@ public class
IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest extends
)
{
@Override
- protected DataStoreMetadataUpdateResult
updateDataSourceMetadataWithHandle(
+ protected SegmentPublishResult updateDataSourceMetadataWithHandle(
SegmentMetadataTransaction transaction,
String dataSource,
DataSourceMetadata startMetadata,
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
index 085958c5f9f..a071ff7af7d 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
@@ -29,9 +29,11 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.loading.DataSegmentKiller;
import
org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence;
import
org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
+import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
@@ -41,11 +43,13 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@@ -199,7 +203,19 @@ public class BatchAppenderatorDriverTest extends
EasyMockSupport
static TransactionalSegmentPublisher makeOkPublisher()
{
- return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata,
schema) -> SegmentPublishResult.ok(ImmutableSet.of());
+ return new TransactionalSegmentPublisher()
+ {
+ @Override
+ public SegmentPublishResult publishAnnotatedSegments(
+ @Nullable Set<DataSegment> segmentsToBeOverwritten,
+ Set<DataSegment> segmentsToPublish,
+ @Nullable Object commitMetadata,
+ @Nullable SegmentSchemaMapping segmentSchemaMapping
+ )
+ {
+ return SegmentPublishResult.ok(Set.of());
+ }
+ };
}
static class TestSegmentAllocator implements SegmentAllocator
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index 27aadaee574..4bf3a8dc22b 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -37,6 +37,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.loading.DataSegmentKiller;
@@ -54,9 +55,9 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -68,6 +69,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
public class StreamAppenderatorDriverTest extends EasyMockSupport
{
@@ -411,13 +413,14 @@ public class StreamAppenderatorDriverTest extends
EasyMockSupport
static TransactionalSegmentPublisher makeOkPublisher()
{
- return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata,
segmentSchemaMapping) ->
- SegmentPublishResult.ok(Collections.emptySet());
+ return makePublisher(
+ (segmentsToPublish) -> SegmentPublishResult.ok(Set.of())
+ );
}
private TransactionalSegmentPublisher makeUpgradingPublisher()
{
- return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata,
segmentSchemaMapping) -> {
+ return makePublisher((segmentsToPublish) -> {
Set<DataSegment> allSegments = new HashSet<>(segmentsToPublish);
int id = 0;
for (DataSegment segment : segmentsToPublish) {
@@ -435,17 +438,36 @@ public class StreamAppenderatorDriverTest extends
EasyMockSupport
allSegments.add(upgradedSegment);
}
return SegmentPublishResult.ok(allSegments);
- };
+ });
}
static TransactionalSegmentPublisher makeFailingPublisher(boolean
failWithException)
{
- return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata,
segmentSchemaMapping) -> {
+ return makePublisher((segmentsToPublish) -> {
final RuntimeException exception = new RuntimeException("test");
if (failWithException) {
throw exception;
}
return SegmentPublishResult.fail(exception.getMessage());
+ });
+ }
+
+ private static TransactionalSegmentPublisher makePublisher(
+ Function<Set<DataSegment>, SegmentPublishResult> publishFunction
+ )
+ {
+ return new TransactionalSegmentPublisher()
+ {
+ @Override
+ public SegmentPublishResult publishAnnotatedSegments(
+ @Nullable Set<DataSegment> segmentsToBeOverwritten,
+ Set<DataSegment> segmentsToPublish,
+ @Nullable Object commitMetadata,
+ @Nullable SegmentSchemaMapping segmentSchemaMapping
+ )
+ {
+ return publishFunction.apply(segmentsToPublish);
+ }
};
}
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java
new file mode 100644
index 00000000000..884b475893d
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import org.apache.druid.indexing.overlord.SegmentPublishResult;
+import org.apache.druid.segment.SegmentSchemaMapping;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+public class TransactionalSegmentPublisherTest
+{
+ @Test(timeout = 60_000L)
+ public void testPublishSegments_retriesUpto5Times_ifFailureIsRetryable()
throws IOException
+ {
+ final AtomicInteger attemptCount = new AtomicInteger(0);
+ final TransactionalSegmentPublisher publisher = createPublisher(
+ SegmentPublishResult.retryableFailure("this error is retryable"),
+ attemptCount
+ );
+
+ Assert.assertEquals(
+ SegmentPublishResult.retryableFailure("this error is retryable"),
+ publisher.publishSegments(null, Set.of(), Function.identity(), null,
null)
+ );
+ Assert.assertEquals(6, attemptCount.get());
+ }
+
+ @Test
+ public void testPublishSegments_doesNotRetry_ifFailureIsNotRetryable()
throws IOException
+ {
+ final AtomicInteger attemptCount = new AtomicInteger(0);
+ final TransactionalSegmentPublisher publisher = createPublisher(
+ SegmentPublishResult.fail("this error is not retryable"),
+ attemptCount
+ );
+
+ Assert.assertEquals(
+ SegmentPublishResult.fail("this error is not retryable"),
+ publisher.publishSegments(null, Set.of(), Function.identity(), null,
null)
+ );
+ Assert.assertEquals(1, attemptCount.get());
+ }
+
+ @Test
+ public void testPublishAnnotatedSegments_doesNotRetry() throws Exception
+ {
+ final AtomicInteger attemptCount = new AtomicInteger(0);
+ final TransactionalSegmentPublisher publisher = createPublisher(
+ SegmentPublishResult.retryableFailure("this error is retryable"),
+ attemptCount
+ );
+
+ Assert.assertEquals(
+ SegmentPublishResult.retryableFailure("this error is retryable"),
+ publisher.publishAnnotatedSegments(null, Set.of(), null, null)
+ );
+ Assert.assertEquals(1, attemptCount.get());
+ }
+
+ private TransactionalSegmentPublisher createPublisher(
+ SegmentPublishResult publishResult,
+ AtomicInteger attemptCount
+ )
+ {
+ return new TransactionalSegmentPublisher()
+ {
+ @Override
+ public SegmentPublishResult publishAnnotatedSegments(
+ @Nullable Set<DataSegment> segmentsToBeOverwritten,
+ Set<DataSegment> segmentsToPublish,
+ @Nullable Object commitMetadata,
+ @Nullable SegmentSchemaMapping segmentSchemaMapping
+ )
+ {
+ attemptCount.incrementAndGet();
+ return publishResult;
+ }
+ };
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]