This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new b46be6eb22d Retry segment publish task actions without holding locks 
(#17816)
b46be6eb22d is described below

commit b46be6eb22dfa9ee671b07e279744d9b6a2eb7bd
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Mar 24 14:04:13 2025 +0530

    Retry segment publish task actions without holding locks (#17816)
    
    #17802 reverted a retry of failed segment publish actions.
    
    This patch attempts to address the original issue by retrying the segment 
publish task actions
    on the client (i.e. task) side without holding any locks so that other 
transactions are not blocked.
    Changes
    
        Add retries to TransactionalSegmentPublisher
        Add field retryable to SegmentPublishResult
        Remove class DataStoreMetadataUpdateResult and use SegmentPublishResult 
instead
---
 .../common/task/AbstractBatchIndexTask.java        |  25 ++++
 .../druid/indexing/common/task/IndexTask.java      |   7 +-
 .../parallel/ParallelIndexSupervisorTask.java      |   8 +-
 .../indexing/seekablestream/SequenceMetadata.java  |   2 +-
 .../SegmentTransactionalInsertActionTest.java      |   9 +-
 .../indexing/overlord/SegmentPublishResult.java    |  50 ++++---
 .../IndexerSQLMetadataStorageCoordinator.java      | 158 ++++-----------------
 .../appenderator/BaseAppenderatorDriver.java       |   3 +-
 .../TransactionalSegmentPublisher.java             |  61 +++++++-
 .../IndexerSQLMetadataStorageCoordinatorTest.java  |  49 ++++---
 ...ataStorageCoordinatorSchemaPersistenceTest.java |   2 +-
 .../appenderator/BatchAppenderatorDriverTest.java  |  18 ++-
 .../appenderator/StreamAppenderatorDriverTest.java |  34 ++++-
 .../TransactionalSegmentPublisherTest.java         | 104 ++++++++++++++
 14 files changed, 329 insertions(+), 201 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 38bc961dc47..ec9c12a14fe 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -76,6 +76,7 @@ import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.IngestionSpec;
 import org.apache.druid.segment.indexing.TuningConfig;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import 
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
 import org.apache.druid.segment.transform.CompactionTransformSpec;
 import org.apache.druid.timeline.CompactionState;
 import org.apache.druid.timeline.DataSegment;
@@ -456,6 +457,30 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
     }
   }
 
+  protected TransactionalSegmentPublisher buildSegmentPublisher(TaskToolbox 
toolbox)
+  {
+    return new TransactionalSegmentPublisher()
+    {
+      @Override
+      public SegmentPublishResult publishAnnotatedSegments(
+          @Nullable Set<DataSegment> segmentsToBeOverwritten,
+          Set<DataSegment> segmentsToPublish,
+          @Nullable Object commitMetadata,
+          @Nullable SegmentSchemaMapping schemaMapping
+      ) throws IOException
+      {
+        return toolbox.getTaskActionClient().submit(
+            buildPublishAction(
+                segmentsToBeOverwritten,
+                segmentsToPublish,
+                schemaMapping,
+                getTaskLockHelper().getLockTypeToUse()
+            )
+        );
+      }
+    };
+  }
+
   protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> 
intervals) throws IOException
   {
     // The given intervals are first converted to align with segment 
granularity. This is because,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index eae1f7caf1e..4ea7e9d3dee 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -48,7 +48,6 @@ import 
org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexer.partitions.SecondaryPartitionType;
 import org.apache.druid.indexer.report.TaskReport;
-import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -866,11 +865,7 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler, Pe
         throw new UOE("[%s] secondary partition type is not supported", 
partitionsSpec.getType());
     }
 
-    final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse();
-    final TransactionalSegmentPublisher publisher =
-        (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, map) -> 
toolbox.getTaskActionClient().submit(
-            buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, 
map, taskLockType)
-        );
+    final TransactionalSegmentPublisher publisher = 
buildSegmentPublisher(toolbox);
 
     String effectiveId = 
getContextValue(CompactionTask.CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, null);
     if (effectiveId == null) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 86e31c74a72..34cc71a3a12 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -45,7 +45,6 @@ import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexer.report.IngestionStatsAndErrors;
 import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
 import org.apache.druid.indexer.report.TaskReport;
-import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
@@ -1191,12 +1190,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask
       }
     }
 
-    final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse();
-    final TransactionalSegmentPublisher publisher =
-        (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, map) -> 
toolbox.getTaskActionClient().submit(
-            buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, 
map, taskLockType)
-        );
-
+    final TransactionalSegmentPublisher publisher = 
buildSegmentPublisher(toolbox);
     final boolean published =
         newSegments.isEmpty()
         || publisher.publishSegments(oldSegments, newSegments, 
annotateFunction, null, segmentSchemaMapping).isSuccess();
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
index 2da858f80cc..f974a1c6c93 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
@@ -331,7 +331,7 @@ public class SequenceMetadata<PartitionIdType, 
SequenceOffsetType>
   }
 
   private class SequenceMetadataTransactionalSegmentPublisher
-      implements TransactionalSegmentPublisher
+      extends TransactionalSegmentPublisher
   {
     private final SeekableStreamIndexTaskRunner<PartitionIdType, 
SequenceOffsetType, ?> runner;
     private final TaskToolbox toolbox;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
index 44ce60b5ceb..095e9c3b57d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.common.actions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
@@ -150,11 +149,9 @@ public class SegmentTransactionalInsertActionTest
     );
 
     Assert.assertEquals(
-        SegmentPublishResult.fail(
-            InvalidInput.exception(
-                "The new start metadata state[ObjectMetadata{theObject=[1]}] 
is"
-                + " ahead of the last committed end state[null]. Try resetting 
the supervisor."
-            ).toString()
+        SegmentPublishResult.retryableFailure(
+            "The new start metadata state[ObjectMetadata{theObject=[1]}] is"
+            + " ahead of the last committed end state[null]. Try resetting the 
supervisor."
         ),
         result
     );
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java
index e4bc1645f71..04b745c812e 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java
@@ -22,7 +22,7 @@ package org.apache.druid.indexing.overlord;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.metadata.PendingSegmentRecord;
 import org.apache.druid.segment.SegmentUtils;
 import org.apache.druid.timeline.DataSegment;
@@ -34,59 +34,59 @@ import java.util.Objects;
 import java.util.Set;
 
 /**
- * Result of an operation that attempts to publish segments. Indicates the set 
of segments actually published
- * and whether or not the transaction was a success.
- *
- * If "success" is false then the segments set will be empty.
- *
- * It's possible for the segments set to be empty even if "success" is true, 
since the segments set only
- * includes segments actually published as part of the transaction. The 
requested segments could have been
- * published by a different transaction (e.g. in the case of replica sets) and 
this one would still succeed.
+ * Result of a segment publish operation.
  */
 public class SegmentPublishResult
 {
   private final Set<DataSegment> segments;
   private final boolean success;
-  @Nullable
+  private final boolean retryable;
   private final String errorMsg;
-  @Nullable
   private final List<PendingSegmentRecord> upgradedPendingSegments;
 
   public static SegmentPublishResult ok(Set<DataSegment> segments)
   {
-    return new SegmentPublishResult(segments, true, null);
+    return new SegmentPublishResult(segments, true, false, null);
   }
 
   public static SegmentPublishResult ok(Set<DataSegment> segments, 
List<PendingSegmentRecord> upgradedPendingSegments)
   {
-    return new SegmentPublishResult(segments, true, null, 
upgradedPendingSegments);
+    return new SegmentPublishResult(segments, true, false, null, 
upgradedPendingSegments);
   }
 
-  public static SegmentPublishResult fail(String errorMsg)
+  public static SegmentPublishResult fail(String errorMsg, Object... args)
   {
-    return new SegmentPublishResult(ImmutableSet.of(), false, errorMsg);
+    return new SegmentPublishResult(Set.of(), false, false, 
StringUtils.format(errorMsg, args), null);
+  }
+
+  public static SegmentPublishResult retryableFailure(String errorMsg, 
Object... args)
+  {
+    return new SegmentPublishResult(Set.of(), false, true, 
StringUtils.format(errorMsg, args), null);
   }
 
   @JsonCreator
   private SegmentPublishResult(
       @JsonProperty("segments") Set<DataSegment> segments,
       @JsonProperty("success") boolean success,
+      @JsonProperty("retryable") boolean retryable,
       @JsonProperty("errorMsg") @Nullable String errorMsg
   )
   {
-    this(segments, success, errorMsg, null);
+    this(segments, success, retryable, errorMsg, null);
   }
 
   private SegmentPublishResult(
       Set<DataSegment> segments,
       boolean success,
-       @Nullable String errorMsg,
+      boolean retryable,
+      @Nullable String errorMsg,
       List<PendingSegmentRecord> upgradedPendingSegments
   )
   {
     this.segments = Preconditions.checkNotNull(segments, "segments");
     this.success = success;
     this.errorMsg = errorMsg;
+    this.retryable = retryable;
     this.upgradedPendingSegments = upgradedPendingSegments;
 
     if (!success) {
@@ -98,6 +98,12 @@ public class SegmentPublishResult
     }
   }
 
+  /**
+   * Set of segments published successfully.
+   *
+   * @return Empty set if the publish operation failed or if all the segments 
had
+   * already been published by a different transaction.
+   */
   @JsonProperty
   public Set<DataSegment> getSegments()
   {
@@ -117,6 +123,12 @@ public class SegmentPublishResult
     return errorMsg;
   }
 
+  @JsonProperty
+  public boolean isRetryable()
+  {
+    return retryable;
+  }
+
   @Nullable
   public List<PendingSegmentRecord> getUpgradedPendingSegments()
   {
@@ -134,6 +146,7 @@ public class SegmentPublishResult
     }
     SegmentPublishResult that = (SegmentPublishResult) o;
     return success == that.success &&
+           retryable == that.retryable &&
            Objects.equals(segments, that.segments) &&
            Objects.equals(errorMsg, that.errorMsg);
   }
@@ -141,7 +154,7 @@ public class SegmentPublishResult
   @Override
   public int hashCode()
   {
-    return Objects.hash(segments, success, errorMsg);
+    return Objects.hash(segments, success, errorMsg, retryable);
   }
 
   @Override
@@ -150,6 +163,7 @@ public class SegmentPublishResult
     return "SegmentPublishResult{" +
            "segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
            ", success=" + success +
+           ", retryable=" + retryable +
            ", errorMsg='" + errorMsg + '\'' +
            '}';
   }
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index ce092f8c696..9a0f91d6afd 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -91,7 +91,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -306,33 +305,22 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
 
     final String dataSource = segments.iterator().next().getDataSource();
 
-    final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);
-
     try {
       return inReadWriteDatasourceTransaction(
           dataSource,
           transaction -> {
-            // Set definitelyNotUpdated back to false upon retrying.
-            definitelyNotUpdated.set(false);
-
+            // Try to update datasource metadata first
             if (startMetadata != null) {
-              final DataStoreMetadataUpdateResult result = 
updateDataSourceMetadataWithHandle(
+              final SegmentPublishResult result = 
updateDataSourceMetadataWithHandle(
                   transaction,
                   dataSource,
                   startMetadata,
                   endMetadata
               );
 
-              if (result.isFailed()) {
-                // Metadata was definitely not updated.
-                transaction.setRollbackOnly();
-                definitelyNotUpdated.set(true);
-
-                if (result.canRetry()) {
-                  throw new RetryTransactionException(result.getErrorMsg());
-                } else {
-                  throw InvalidInput.exception(result.getErrorMsg());
-                }
+              // Do not proceed if the datasource metadata update failed
+              if (!result.isSuccess()) {
+                return result;
               }
             }
 
@@ -347,12 +335,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       );
     }
     catch (CallbackFailedException e) {
-      if (definitelyNotUpdated.get()) {
-        return SegmentPublishResult.fail(e.getMessage());
-      } else {
-        // Must throw exception if we are not sure if we updated or not.
-        throw e;
-      }
+      throw e;
     }
   }
 
@@ -468,45 +451,19 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       throw new IllegalArgumentException("end metadata cannot be null");
     }
 
-    final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);
-
     try {
       return inReadWriteDatasourceTransaction(
           dataSource,
-          transaction -> {
-            // Set definitelyNotUpdated back to false upon retrying.
-            definitelyNotUpdated.set(false);
-
-            final DataStoreMetadataUpdateResult result = 
updateDataSourceMetadataWithHandle(
-                transaction,
-                dataSource,
-                startMetadata,
-                endMetadata
-            );
-
-            if (result.isFailed()) {
-              // Metadata was definitely not updated.
-              transaction.setRollbackOnly();
-              definitelyNotUpdated.set(true);
-
-              if (result.canRetry()) {
-                throw new RetryTransactionException(result.getErrorMsg());
-              } else {
-                throw new RuntimeException(result.getErrorMsg());
-              }
-            }
-
-            return SegmentPublishResult.ok(ImmutableSet.of());
-          }
+          transaction -> updateDataSourceMetadataWithHandle(
+              transaction,
+              dataSource,
+              startMetadata,
+              endMetadata
+          )
       );
     }
     catch (CallbackFailedException e) {
-      if (definitelyNotUpdated.get()) {
-        return SegmentPublishResult.fail(e.getMessage());
-      } else {
-        // Must throw exception if we are not sure if we updated or not.
-        throw e;
-      }
+      throw e;
     }
   }
 
@@ -1126,25 +1083,18 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
         }
     );
 
-    final AtomicBoolean metadataNotUpdated = new AtomicBoolean(false);
     try {
       return inReadWriteDatasourceTransaction(
           dataSource,
           transaction -> {
-            metadataNotUpdated.set(false);
-
+            // Try to update datasource metadata first
             if (startMetadata != null) {
-              final DataStoreMetadataUpdateResult metadataUpdateResult
+              final SegmentPublishResult metadataUpdateResult
                   = updateDataSourceMetadataWithHandle(transaction, 
dataSource, startMetadata, endMetadata);
 
-              if (metadataUpdateResult.isFailed()) {
-                transaction.setRollbackOnly();
-                metadataNotUpdated.set(true);
-                if (metadataUpdateResult.canRetry()) {
-                  throw new 
RetryTransactionException(metadataUpdateResult.getErrorMsg());
-                } else {
-                  throw new 
RuntimeException(metadataUpdateResult.getErrorMsg());
-                }
+              // Abort the transaction if datasource metadata update has failed
+              if (!metadataUpdateResult.isSuccess()) {
+                return metadataUpdateResult;
               }
             }
 
@@ -1172,12 +1122,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       );
     }
     catch (CallbackFailedException e) {
-      if (metadataNotUpdated.get()) {
-        // Return failed result if metadata was definitely not updated
-        return SegmentPublishResult.fail(e.getMessage());
-      } else {
-        throw e;
-      }
+      throw e;
     }
   }
 
@@ -2052,7 +1997,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
    *
    * @throws RuntimeException if state is unknown after this call
    */
-  protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle(
+  protected SegmentPublishResult updateDataSourceMetadataWithHandle(
       final SegmentMetadataTransaction transaction,
       final String dataSource,
       final DataSourceMetadata startMetadata,
@@ -2102,7 +2047,9 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
 
     if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) {
       // Offsets stored in startMetadata is greater than the last commited 
metadata.
-      return DataStoreMetadataUpdateResult.failure(
+      // This can happen because the previous task is still publishing its 
segments and can resolve once
+      // the previous task finishes publishing.
+      return SegmentPublishResult.retryableFailure(
           "The new start metadata state[%s] is ahead of the last committed"
           + " end state[%s]. Try resetting the supervisor.",
           startMetadata, oldCommitMetadataFromDb
@@ -2111,7 +2058,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
 
     if (!startMetadataMatchesExisting) {
       // Not in the desired start state.
-      return DataStoreMetadataUpdateResult.failure(
+      return SegmentPublishResult.fail(
           "Inconsistency between stored metadata state[%s] and target 
state[%s]. Try resetting the supervisor.",
           oldCommitMetadataFromDb, startMetadata
       );
@@ -2126,7 +2073,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
         Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes()
     );
 
-    final DataStoreMetadataUpdateResult retVal;
+    final SegmentPublishResult retVal;
     if (oldCommitMetadataBytesFromDb == null) {
       // SELECT -> INSERT can fail due to races; callers must be prepared to 
retry.
       final int numRows = transaction.getHandle().createStatement(
@@ -2143,8 +2090,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                                 .execute();
 
       retVal = numRows == 1
-          ? DataStoreMetadataUpdateResult.SUCCESS
-          : DataStoreMetadataUpdateResult.retryableFailure("Failed to insert 
metadata for datasource[%s]", dataSource);
+          ? SegmentPublishResult.ok(Set.of())
+          : SegmentPublishResult.retryableFailure("Failed to insert metadata 
for datasource[%s]", dataSource);
     } else {
       // Expecting a particular old metadata; use the SHA1 in a 
compare-and-swap UPDATE
       final int numRows = transaction.getHandle().createStatement(
@@ -2163,8 +2110,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                                 .execute();
 
       retVal = numRows == 1
-          ? DataStoreMetadataUpdateResult.SUCCESS
-          : DataStoreMetadataUpdateResult.retryableFailure("Failed to update 
metadata for datasource[%s]", dataSource);
+          ? SegmentPublishResult.ok(Set.of())
+          : SegmentPublishResult.retryableFailure("Failed to update metadata 
for datasource[%s]", dataSource);
     }
 
     if (retVal.isSuccess()) {
@@ -2521,51 +2468,4 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   {
     return transactionFactory.inReadOnlyDatasourceTransaction(dataSource, 
callback);
   }
-
-  public static class DataStoreMetadataUpdateResult
-  {
-    private final boolean failed;
-    private final boolean canRetry;
-    private final String errorMsg;
-
-    public static final DataStoreMetadataUpdateResult SUCCESS = new 
DataStoreMetadataUpdateResult(false, false, null);
-
-    public static DataStoreMetadataUpdateResult failure(String errorMsgFormat, 
Object... messageArgs)
-    {
-      return new DataStoreMetadataUpdateResult(true, false, errorMsgFormat, 
messageArgs);
-    }
-
-    public static DataStoreMetadataUpdateResult retryableFailure(String 
errorMsgFormat, Object... messageArgs)
-    {
-      return new DataStoreMetadataUpdateResult(true, true, errorMsgFormat, 
messageArgs);
-    }
-
-    DataStoreMetadataUpdateResult(boolean failed, boolean canRetry, @Nullable 
String errorMsg, Object... errorFormatArgs)
-    {
-      this.failed = failed;
-      this.canRetry = canRetry;
-      this.errorMsg = null == errorMsg ? null : StringUtils.format(errorMsg, 
errorFormatArgs);
-    }
-
-    public boolean isFailed()
-    {
-      return failed;
-    }
-
-    public boolean isSuccess()
-    {
-      return !failed;
-    }
-
-    public boolean canRetry()
-    {
-      return canRetry;
-    }
-
-    @Nullable
-    public String getErrorMsg()
-    {
-      return errorMsg;
-    }
-  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index fa7d037c92c..7ca7fd74c2d 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -719,7 +719,8 @@ public abstract class BaseAppenderatorDriver implements 
Closeable
               e -> (e != null && e.getMessage() != null
                     && e.getMessage().contains("Failed to update the metadata 
Store."
                                                + " The new start metadata is 
ahead of last commited end state.")),
-              RetryUtils.DEFAULT_MAX_TRIES
+              // Do not retry here since the TransactionalSegmentPublisher 
itself performs required retries
+              1
           );
         }
         catch (Exception e) {
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
index 390f423fdb5..eb84b50399a 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
@@ -20,6 +20,9 @@
 package org.apache.druid.segment.realtime.appenderator;
 
 import org.apache.druid.indexing.overlord.SegmentPublishResult;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.segment.SegmentSchemaMapping;
 import org.apache.druid.timeline.DataSegment;
 
@@ -28,8 +31,11 @@ import java.io.IOException;
 import java.util.Set;
 import java.util.function.Function;
 
-public interface TransactionalSegmentPublisher
+public abstract class TransactionalSegmentPublisher
 {
+  private static final int QUIET_RETRIES = 3;
+  private static final int MAX_RETRIES = 5;
+
   /**
    * Publish segments, along with some commit metadata, in a single 
transaction.
    *
@@ -40,14 +46,19 @@ public interface TransactionalSegmentPublisher
    * @throws IOException if there was an I/O error when publishing
    * @throws RuntimeException if we cannot tell if the segments were published 
or not, for some other reason
    */
-  SegmentPublishResult publishAnnotatedSegments(
+  public abstract SegmentPublishResult publishAnnotatedSegments(
       @Nullable Set<DataSegment> segmentsToBeOverwritten,
       Set<DataSegment> segmentsToPublish,
       @Nullable Object commitMetadata,
       @Nullable SegmentSchemaMapping segmentSchemaMapping
   ) throws IOException;
 
-  default SegmentPublishResult publishSegments(
+  /**
+   * Applies the given annotate function on the segments and tries to publish
+   * them. If the action fails with a retryable failure, it can be retried upto
+   * {@link #MAX_RETRIES} times.
+   */
+  public final SegmentPublishResult publishSegments(
       @Nullable Set<DataSegment> segmentsToBeOverwritten,
       Set<DataSegment> segmentsToPublish,
       Function<Set<DataSegment>, Set<DataSegment>> 
outputSegmentsAnnotateFunction,
@@ -57,20 +68,58 @@ public interface TransactionalSegmentPublisher
   {
     final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction = 
outputSegmentsAnnotateFunction
         .andThen(SegmentPublisherHelper::annotateShardSpec);
-    return publishAnnotatedSegments(
+    final Set<DataSegment> annotatedSegmentsToPublish = 
annotateFunction.apply(segmentsToPublish);
+
+    int attemptCount = 0;
+
+    // Retry until success or until max retries are exhausted
+    SegmentPublishResult result = publishAnnotatedSegments(
         segmentsToBeOverwritten,
-        annotateFunction.apply(segmentsToPublish),
+        annotatedSegmentsToPublish,
         commitMetadata,
         segmentSchemaMapping
     );
+    while (!result.isSuccess() && result.isRetryable() && attemptCount++ < 
MAX_RETRIES) {
+      awaitNextRetry(result, attemptCount);
+      result = publishAnnotatedSegments(
+          segmentsToBeOverwritten,
+          annotatedSegmentsToPublish,
+          commitMetadata,
+          segmentSchemaMapping
+      );
+    }
+
+    return result;
   }
 
   /**
    * @return true if this publisher has action to take when publishing with an 
empty segment set.
    *         The publisher used by the seekable stream tasks is an example 
where this is true.
    */
-  default boolean supportsEmptyPublish()
+  public boolean supportsEmptyPublish()
   {
     return false;
   }
+
+  /**
+   * Sleeps until the next attempt.
+   */
+  private static void awaitNextRetry(SegmentPublishResult lastResult, int 
attemptCount)
+  {
+    try {
+      RetryUtils.awaitNextRetry(
+          new ISE(lastResult.getErrorMsg()),
+          StringUtils.format(
+              "Segment publish failed due to error[%s]",
+              lastResult.getErrorMsg()
+          ),
+          attemptCount,
+          MAX_RETRIES,
+          attemptCount <= QUIET_RETRIES
+      );
+    }
+    catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index b6578fa9a72..0212c8f2d99 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import org.apache.druid.data.input.StringTuple;
-import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.ObjectMetadata;
 import org.apache.druid.indexing.overlord.SegmentCreateRequest;
@@ -186,7 +185,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     )
     {
       @Override
-      protected DataStoreMetadataUpdateResult 
updateDataSourceMetadataWithHandle(
+      protected SegmentPublishResult updateDataSourceMetadataWithHandle(
           SegmentMetadataTransaction transaction,
           String dataSource,
           DataSourceMetadata startMetadata,
@@ -780,7 +779,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     )
     {
       @Override
-      protected DataStoreMetadataUpdateResult 
updateDataSourceMetadataWithHandle(
+      protected SegmentPublishResult updateDataSourceMetadataWithHandle(
           SegmentMetadataTransaction transaction,
           String dataSource,
           DataSourceMetadata startMetadata,
@@ -789,7 +788,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
       {
         metadataUpdateCounter.getAndIncrement();
         if (attemptCounter.getAndIncrement() == 0) {
-          return DataStoreMetadataUpdateResult.retryableFailure(null);
+          return SegmentPublishResult.retryableFailure("this failure can be 
retried");
         } else {
           return super.updateDataSourceMetadataWithHandle(transaction, 
dataSource, startMetadata, endMetadata);
         }
@@ -803,7 +802,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
         new ObjectMetadata(ImmutableMap.of("foo", "bar")),
         new 
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)
     );
-    
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)), 
result1);
+    Assert.assertEquals(SegmentPublishResult.retryableFailure("this failure 
can be retried"), result1);
+
+    final SegmentPublishResult resultOnRetry = 
failOnceCoordinator.commitSegmentsAndMetadata(
+        ImmutableSet.of(defaultSegment),
+        new ObjectMetadata(null),
+        new ObjectMetadata(ImmutableMap.of("foo", "bar")),
+        new 
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)
+    );
+    
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)), 
resultOnRetry);
 
     Assert.assertArrayEquals(
         
mapper.writeValueAsString(defaultSegment).getBytes(StandardCharsets.UTF_8),
@@ -825,7 +832,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
         new ObjectMetadata(ImmutableMap.of("foo", "baz")),
         new 
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)
     );
-    
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment2)), 
result2);
+    Assert.assertEquals(SegmentPublishResult.retryableFailure("this failure 
can be retried"), result2);
+
+    final SegmentPublishResult resultOnRetry2 = 
failOnceCoordinator.commitSegmentsAndMetadata(
+        ImmutableSet.of(defaultSegment2),
+        new ObjectMetadata(ImmutableMap.of("foo", "bar")),
+        new ObjectMetadata(ImmutableMap.of("foo", "baz")),
+        new 
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)
+    );
+    
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment2)), 
resultOnRetry2);
 
     Assert.assertArrayEquals(
         
mapper.writeValueAsString(defaultSegment2).getBytes(StandardCharsets.UTF_8),
@@ -857,11 +872,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
         new 
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)
     );
     Assert.assertEquals(
-        SegmentPublishResult.fail(
-            InvalidInput.exception(
-                "The new start metadata 
state[ObjectMetadata{theObject={foo=bar}}] is ahead of the last committed"
-                + " end state[null]. Try resetting the supervisor."
-            ).toString()),
+        SegmentPublishResult.retryableFailure(
+            "The new start metadata state[ObjectMetadata{theObject={foo=bar}}] 
is ahead of the last committed"
+            + " end state[null]. Try resetting the supervisor."
+        ),
         result1
     );
 
@@ -888,10 +902,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     );
     Assert.assertEquals(
         SegmentPublishResult.fail(
-            InvalidInput.exception(
-                "Inconsistency between stored metadata 
state[ObjectMetadata{theObject={foo=baz}}]"
-                + " and target state[ObjectMetadata{theObject=null}]. Try 
resetting the supervisor."
-            ).toString()
+            "Inconsistency between stored metadata 
state[ObjectMetadata{theObject={foo=baz}}]"
+            + " and target state[ObjectMetadata{theObject=null}]. Try 
resetting the supervisor."
         ),
         result2
     );
@@ -972,10 +984,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     );
     Assert.assertEquals(
         SegmentPublishResult.fail(
-            InvalidInput.exception(
-                "Inconsistency between stored metadata 
state[ObjectMetadata{theObject={foo=baz}}] and "
-                + "target state[ObjectMetadata{theObject={foo=qux}}]. Try 
resetting the supervisor."
-            ).toString()),
+            "Inconsistency between stored metadata 
state[ObjectMetadata{theObject={foo=baz}}] and "
+            + "target state[ObjectMetadata{theObject={foo=qux}}]. Try 
resetting the supervisor."
+        ),
         result2
     );
 
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
index 9bed46b2c3a..c72e1a75afe 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
@@ -108,7 +108,7 @@ public class 
IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest extends
     )
     {
       @Override
-      protected DataStoreMetadataUpdateResult 
updateDataSourceMetadataWithHandle(
+      protected SegmentPublishResult updateDataSourceMetadataWithHandle(
           SegmentMetadataTransaction transaction,
           String dataSource,
           DataSourceMetadata startMetadata,
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
index 085958c5f9f..a071ff7af7d 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
@@ -29,9 +29,11 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.segment.SegmentSchemaMapping;
 import org.apache.druid.segment.loading.DataSegmentKiller;
 import 
org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence;
 import 
org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
+import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
@@ -41,11 +43,13 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
@@ -199,7 +203,19 @@ public class BatchAppenderatorDriverTest extends 
EasyMockSupport
 
   static TransactionalSegmentPublisher makeOkPublisher()
   {
-    return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, 
schema) -> SegmentPublishResult.ok(ImmutableSet.of());
+    return new TransactionalSegmentPublisher()
+    {
+      @Override
+      public SegmentPublishResult publishAnnotatedSegments(
+          @Nullable Set<DataSegment> segmentsToBeOverwritten,
+          Set<DataSegment> segmentsToPublish,
+          @Nullable Object commitMetadata,
+          @Nullable SegmentSchemaMapping segmentSchemaMapping
+      )
+      {
+        return SegmentPublishResult.ok(Set.of());
+      }
+    };
   }
 
   static class TestSegmentAllocator implements SegmentAllocator
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index 27aadaee574..4bf3a8dc22b 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -37,6 +37,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.SegmentSchemaMapping;
 import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
 import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
 import org.apache.druid.segment.loading.DataSegmentKiller;
@@ -54,9 +55,9 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -68,6 +69,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 
 public class StreamAppenderatorDriverTest extends EasyMockSupport
 {
@@ -411,13 +413,14 @@ public class StreamAppenderatorDriverTest extends 
EasyMockSupport
 
   static TransactionalSegmentPublisher makeOkPublisher()
   {
-    return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, 
segmentSchemaMapping) ->
-        SegmentPublishResult.ok(Collections.emptySet());
+    return makePublisher(
+        (segmentsToPublish) -> SegmentPublishResult.ok(Set.of())
+    );
   }
 
   private TransactionalSegmentPublisher makeUpgradingPublisher()
   {
-    return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, 
segmentSchemaMapping) -> {
+    return makePublisher((segmentsToPublish) -> {
       Set<DataSegment> allSegments = new HashSet<>(segmentsToPublish);
       int id = 0;
       for (DataSegment segment : segmentsToPublish) {
@@ -435,17 +438,36 @@ public class StreamAppenderatorDriverTest extends 
EasyMockSupport
         allSegments.add(upgradedSegment);
       }
       return SegmentPublishResult.ok(allSegments);
-    };
+    });
   }
 
   static TransactionalSegmentPublisher makeFailingPublisher(boolean 
failWithException)
   {
-    return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, 
segmentSchemaMapping) -> {
+    return makePublisher((segmentsToPublish) -> {
       final RuntimeException exception = new RuntimeException("test");
       if (failWithException) {
         throw exception;
       }
       return SegmentPublishResult.fail(exception.getMessage());
+    });
+  }
+
+  private static TransactionalSegmentPublisher makePublisher(
+      Function<Set<DataSegment>, SegmentPublishResult> publishFunction
+  )
+  {
+    return new TransactionalSegmentPublisher()
+    {
+      @Override
+      public SegmentPublishResult publishAnnotatedSegments(
+          @Nullable Set<DataSegment> segmentsToBeOverwritten,
+          Set<DataSegment> segmentsToPublish,
+          @Nullable Object commitMetadata,
+          @Nullable SegmentSchemaMapping segmentSchemaMapping
+      )
+      {
+        return publishFunction.apply(segmentsToPublish);
+      }
     };
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java
new file mode 100644
index 00000000000..884b475893d
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import org.apache.druid.indexing.overlord.SegmentPublishResult;
+import org.apache.druid.segment.SegmentSchemaMapping;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+public class TransactionalSegmentPublisherTest
+{
+  @Test(timeout = 60_000L)
+  public void testPublishSegments_retriesUpto5Times_ifFailureIsRetryable() 
throws IOException
+  {
+    final AtomicInteger attemptCount = new AtomicInteger(0);
+    final TransactionalSegmentPublisher publisher = createPublisher(
+        SegmentPublishResult.retryableFailure("this error is retryable"),
+        attemptCount
+    );
+
+    Assert.assertEquals(
+        SegmentPublishResult.retryableFailure("this error is retryable"),
+        publisher.publishSegments(null, Set.of(), Function.identity(), null, 
null)
+    );
+    Assert.assertEquals(6, attemptCount.get());
+  }
+
+  @Test
+  public void testPublishSegments_doesNotRetry_ifFailureIsNotRetryable() 
throws IOException
+  {
+    final AtomicInteger attemptCount = new AtomicInteger(0);
+    final TransactionalSegmentPublisher publisher = createPublisher(
+        SegmentPublishResult.fail("this error is not retryable"),
+        attemptCount
+    );
+
+    Assert.assertEquals(
+        SegmentPublishResult.fail("this error is not retryable"),
+        publisher.publishSegments(null, Set.of(), Function.identity(), null, 
null)
+    );
+    Assert.assertEquals(1, attemptCount.get());
+  }
+
+  @Test
+  public void testPublishAnnotatedSegments_doesNotRetry() throws Exception
+  {
+    final AtomicInteger attemptCount = new AtomicInteger(0);
+    final TransactionalSegmentPublisher publisher = createPublisher(
+        SegmentPublishResult.retryableFailure("this error is retryable"),
+        attemptCount
+    );
+
+    Assert.assertEquals(
+        SegmentPublishResult.retryableFailure("this error is retryable"),
+        publisher.publishAnnotatedSegments(null, Set.of(), null, null)
+    );
+    Assert.assertEquals(1, attemptCount.get());
+  }
+
+  private TransactionalSegmentPublisher createPublisher(
+      SegmentPublishResult publishResult,
+      AtomicInteger attemptCount
+  )
+  {
+    return new TransactionalSegmentPublisher()
+    {
+      @Override
+      public SegmentPublishResult publishAnnotatedSegments(
+          @Nullable Set<DataSegment> segmentsToBeOverwritten,
+          Set<DataSegment> segmentsToPublish,
+          @Nullable Object commitMetadata,
+          @Nullable SegmentSchemaMapping segmentSchemaMapping
+      )
+      {
+        attemptCount.incrementAndGet();
+        return publishResult;
+      }
+    };
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to