This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 047c7340ab3 Adding retries to update the metadata store instead of 
failure (#15141)
047c7340ab3 is described below

commit 047c7340ab3981cdb987cf6ddaaa541f9f72f2df
Author: PANKAJ KUMAR <[email protected]>
AuthorDate: Wed Jan 10 12:30:54 2024 +0530

    Adding retries to update the metadata store instead of failure (#15141)
    
    Currently, If 2 tasks are consuming from the same partitions, try to 
publish the segment and update the metadata, the second task can fail because 
the end offset stored in the metadata store doesn't match with the start offset 
of the second task. We can fix this by retrying instead of failing.
    
    AFAIK apart from the above issue, the metadata mismatch can happen in 2 
scenarios:
    
    - when we update the input topic name for the data source
    - when we run 2 replicas of ingestion tasks(1 replica will publish and 1 
will fail as the first replica has already updated the metadata).
    
    Implemented the comparable function to compare the last committed end 
offset and new Sequence start offset. And return a specific error msg for this.
    
    Add retry logic on indexers to retry for this specific error msg.
    
    Updated the existing test case.
---
 .../indexing/kafka/KafkaDataSourceMetadata.java    |  20 ++-
 .../SeekableStreamEndSequenceNumbers.java          |  33 ++++
 .../SeekableStreamSequenceNumbers.java             |   8 +
 .../SeekableStreamStartSequenceNumbers.java        |  33 ++++
 .../SegmentTransactionalInsertActionTest.java      |   6 +-
 .../SeekableStreamEndSequenceNumbersTest.java      |  37 +++++
 .../SeekableStreamStartSequenceNumbersTest.java    |  41 +++++
 .../IndexerSQLMetadataStorageCoordinator.java      |  19 +++
 .../appenderator/BaseAppenderatorDriver.java       | 168 +++++++++++----------
 .../IndexerSQLMetadataStorageCoordinatorTest.java  |   5 +-
 10 files changed, 283 insertions(+), 87 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java
index 96c94951e0e..a7b73f445ac 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java
@@ -26,8 +26,11 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
+import org.apache.druid.java.util.common.IAE;
 
-public class KafkaDataSourceMetadata extends 
SeekableStreamDataSourceMetadata<KafkaTopicPartition, Long>
+import java.util.Comparator;
+
+public class KafkaDataSourceMetadata extends 
SeekableStreamDataSourceMetadata<KafkaTopicPartition, Long> implements 
Comparable<KafkaDataSourceMetadata>
 {
 
   @JsonCreator
@@ -58,4 +61,19 @@ public class KafkaDataSourceMetadata extends 
SeekableStreamDataSourceMetadata<Ka
   {
     return new KafkaDataSourceMetadata(seekableStreamSequenceNumbers);
   }
+
+  @Override
+  // This method is to compare KafkaDataSourceMetadata.
+  // It compares this and other SeekableStreamSequenceNumbers using 
naturalOrder comparator.
+  public int compareTo(KafkaDataSourceMetadata other)
+  {
+    if (!getClass().equals(other.getClass())) {
+      throw new IAE(
+          "Expected instance of %s, got %s",
+          this.getClass().getName(),
+          other.getClass().getName()
+      );
+    }
+    return 
getSeekableStreamSequenceNumbers().compareTo(other.getSeekableStreamSequenceNumbers(),
 Comparator.naturalOrder());
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
index 3f2f7bfe1bc..a95f4cccab6 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
 import org.apache.druid.java.util.common.IAE;
 
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -147,6 +148,38 @@ public class 
SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetTyp
     }
   }
 
+  @Override
+  public int compareTo(SeekableStreamSequenceNumbers<PartitionIdType, 
SequenceOffsetType> other, Comparator<SequenceOffsetType> comparator)
+  {
+    if (this.getClass() != other.getClass()) {
+      throw new IAE(
+          "Expected instance of %s, got %s",
+          this.getClass().getName(),
+          other.getClass().getName()
+      );
+    }
+
+    final SeekableStreamEndSequenceNumbers<PartitionIdType, 
SequenceOffsetType> otherStart =
+        (SeekableStreamEndSequenceNumbers<PartitionIdType, 
SequenceOffsetType>) other;
+
+    if (stream.equals(otherStart.stream)) {
+      //Same stream, compare the offset
+      boolean res = false;
+      for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : 
partitionSequenceNumberMap.entrySet()) {
+        PartitionIdType partitionId = entry.getKey();
+        SequenceOffsetType sequenceOffset = entry.getValue();
+        if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && 
comparator.compare(sequenceOffset, 
otherStart.partitionSequenceNumberMap.get(partitionId)) > 0) {
+          res = true;
+          break;
+        }
+      }
+      if (res) {
+        return 1;
+      }
+    }
+    return 0;
+  }
+
   @Override
   public SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> 
minus(
       SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java
index a790974e25f..44f1343e25d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 
+import java.util.Comparator;
 import java.util.Map;
 
 @JsonTypeInfo(use = Id.NAME, property = "type", defaultImpl = 
SeekableStreamEndSequenceNumbers.class)
@@ -61,4 +62,11 @@ public interface 
SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetTy
   SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> minus(
       SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other
   );
+
+  /**
+   * Compare this and the other sequence offsets using comparator.
+   * Returns 1, if this sequence is ahead of the other.
+   * otherwise, Return 0
+   */
+  int compareTo(SeekableStreamSequenceNumbers<PartitionIdType, 
SequenceOffsetType> seekableStreamSequenceNumbers, 
Comparator<SequenceOffsetType> comparator);
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
index 52e5f31cf1d..b1a72ff3390 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
@@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.IAE;
 
 import javax.annotation.Nullable;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -161,6 +162,38 @@ public class 
SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
     }
   }
 
+  @Override
+  public int compareTo(SeekableStreamSequenceNumbers<PartitionIdType, 
SequenceOffsetType> other, Comparator<SequenceOffsetType> comparator)
+  {
+    if (this.getClass() != other.getClass()) {
+      throw new IAE(
+          "Expected instance of %s, got %s",
+          this.getClass().getName(),
+          other.getClass().getName()
+      );
+    }
+
+    final SeekableStreamStartSequenceNumbers<PartitionIdType, 
SequenceOffsetType> otherStart =
+        (SeekableStreamStartSequenceNumbers<PartitionIdType, 
SequenceOffsetType>) other;
+
+    if (stream.equals(otherStart.stream)) {
+      //Same stream, compare the offset
+      boolean res = false;
+      for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : 
partitionSequenceNumberMap.entrySet()) {
+        PartitionIdType partitionId = entry.getKey();
+        SequenceOffsetType sequenceOffset = entry.getValue();
+        if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && 
comparator.compare(sequenceOffset, 
otherStart.partitionSequenceNumberMap.get(partitionId)) > 0) {
+          res = true;
+          break;
+        }
+      }
+      if (res) {
+        return 1;
+      }
+    }
+    return 0;
+  }
+
   @Override
   public SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> 
minus(
       SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
index b8bdbfb2add..6f8e827c705 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
@@ -151,11 +151,7 @@ public class SegmentTransactionalInsertActionTest
     );
 
     Assert.assertEquals(
-        SegmentPublishResult.fail(
-          "java.lang.RuntimeException: Inconsistent metadata state. " +
-          "This can happen if you update input topic in a spec without 
changing the supervisor name. " +
-          "Stored state: [null], Target state: 
[ObjectMetadata{theObject=[1]}]."
-        ),
+        SegmentPublishResult.fail("java.lang.RuntimeException: Failed to 
update the metadata Store. The new start metadata is ahead of last commited end 
state."),
         result
     );
   }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbersTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbersTest.java
index 691f579d8e6..9cf29d6cbd2 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbersTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbersTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.segment.TestHelper;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Comparator;
 import java.util.Map;
 
 public class SeekableStreamEndSequenceNumbersTest
@@ -95,4 +96,40 @@ public class SeekableStreamEndSequenceNumbersTest
         endSequenceNumbers.asStartPartitions(true)
     );
   }
+
+  @Test
+  public void testCompareToWithTrueResult()
+  {
+    final String stream = "theStream";
+    final Map<Integer, Long> offsetMap1 = ImmutableMap.of(1, 5L, 2, 6L);
+    final SeekableStreamEndSequenceNumbers<Integer, Long> partitions1 = new 
SeekableStreamEndSequenceNumbers<>(
+        stream,
+        offsetMap1
+    );
+
+    final Map<Integer, Long> offsetMap2 = ImmutableMap.of(1, 4L, 2, 4L);
+    final SeekableStreamEndSequenceNumbers<Integer, Long> partitions2 = new 
SeekableStreamEndSequenceNumbers<>(
+        stream,
+        offsetMap2
+    );
+    Assert.assertEquals(1, partitions1.compareTo(partitions2, 
Comparator.naturalOrder()));
+  }
+
+  @Test
+  public void testCompareToWithFalseResult()
+  {
+    final String stream = "theStream";
+    final Map<Integer, Long> offsetMap1 = ImmutableMap.of(1, 3L, 2, 2L);
+    final SeekableStreamEndSequenceNumbers<Integer, Long> partitions1 = new 
SeekableStreamEndSequenceNumbers<>(
+        stream,
+        offsetMap1
+    );
+
+    final Map<Integer, Long> offsetMap2 = ImmutableMap.of(1, 4L, 2, 4L);
+    final SeekableStreamEndSequenceNumbers<Integer, Long> partitions2 = new 
SeekableStreamEndSequenceNumbers<>(
+        stream,
+        offsetMap2
+    );
+    Assert.assertEquals(0, partitions1.compareTo(partitions2, 
Comparator.naturalOrder()));
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java
index f4342e27acb..f632cf61ecf 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.segment.TestHelper;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Comparator;
 import java.util.Map;
 
 public class SeekableStreamStartSequenceNumbersTest
@@ -74,4 +75,44 @@ public class SeekableStreamStartSequenceNumbersTest
         OBJECT_MAPPER.convertValue(asMap.get("partitionOffsetMap"), new 
TypeReference<Map<Integer, Long>>() {})
     );
   }
+
+  @Test
+  public void testCompareToWithTrueResult()
+  {
+    final String stream = "theStream";
+    final Map<Integer, Long> offsetMap1 = ImmutableMap.of(1, 5L, 2, 6L);
+    final SeekableStreamStartSequenceNumbers<Integer, Long> partitions1 = new 
SeekableStreamStartSequenceNumbers<>(
+        stream,
+        offsetMap1,
+        ImmutableSet.of(6)
+    );
+
+    final Map<Integer, Long> offsetMap2 = ImmutableMap.of(1, 4L, 2, 4L);
+    final SeekableStreamStartSequenceNumbers<Integer, Long> partitions2 = new 
SeekableStreamStartSequenceNumbers<>(
+        stream,
+        offsetMap2,
+        ImmutableSet.of(6)
+    );
+    Assert.assertEquals(1, partitions1.compareTo(partitions2, 
Comparator.naturalOrder()));
+  }
+
+  @Test
+  public void testCompareToWithFalseResult()
+  {
+    final String stream = "theStream";
+    final Map<Integer, Long> offsetMap1 = ImmutableMap.of(1, 3L, 2, 2L);
+    final SeekableStreamStartSequenceNumbers<Integer, Long> partitions1 = new 
SeekableStreamStartSequenceNumbers<>(
+        stream,
+        offsetMap1,
+        ImmutableSet.of(6)
+    );
+
+    final Map<Integer, Long> offsetMap2 = ImmutableMap.of(1, 4L, 2, 4L);
+    final SeekableStreamStartSequenceNumbers<Integer, Long> partitions2 = new 
SeekableStreamStartSequenceNumbers<>(
+        stream,
+        offsetMap2,
+        ImmutableSet.of(6)
+    );
+    Assert.assertEquals(0, partitions1.compareTo(partitions2, 
Comparator.naturalOrder()));
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index e4f5dd318d6..252386c56bd 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -2404,11 +2404,19 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     }
 
     final boolean startMetadataMatchesExisting;
+    int startMetadataGreaterThanExisting = 0;
 
     if (oldCommitMetadataFromDb == null) {
       startMetadataMatchesExisting = startMetadata.isValidStart();
+      startMetadataGreaterThanExisting = 1;
     } else {
       // Checking against the last committed metadata.
+      // If the new start sequence number is greater than the end sequence 
number of last commit compareTo() function will return 1,
+      // 0 in all other cases. It might be because multiple tasks are 
publishing the sequence at around same time.
+      if (startMetadata instanceof Comparable) {
+        startMetadataGreaterThanExisting = ((Comparable) 
startMetadata.asStartMetadata()).compareTo(oldCommitMetadataFromDb.asStartMetadata());
+      }
+
       // Converting the last one into start metadata for checking since only 
the same type of metadata can be matched.
       // Even though kafka/kinesis indexing services use different 
sequenceNumber types for representing
       // start and end sequenceNumbers, the below conversion is fine because 
the new start sequenceNumbers are supposed
@@ -2416,6 +2424,17 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       startMetadataMatchesExisting = 
startMetadata.asStartMetadata().matches(oldCommitMetadataFromDb.asStartMetadata());
     }
 
+    if (startMetadataGreaterThanExisting == 1 && 
!startMetadataMatchesExisting) {
+      // Offset stored in StartMetadata is Greater than the last commited 
metadata,
+      // Then retry multiple task might be trying to publish the segment for 
same partitions.
+      log.info("Failed to update the metadata Store. The new start metadata: 
[%s] is ahead of last commited end state: [%s].",
+          startMetadata,
+          oldCommitMetadataFromDb);
+      return new DataStoreMetadataUpdateResult(true, false,
+          "Failed to update the metadata Store. The new start metadata is 
ahead of last commited end state."
+      );
+    }
+
     if (!startMetadataMatchesExisting) {
       // Not in the desired start state.
       return new DataStoreMetadataUpdateResult(true, false, StringUtils.format(
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index 9c212336a17..73d2d3717ab 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -38,6 +38,7 @@ import org.apache.druid.data.input.Committer;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.indexing.overlord.SegmentPublishResult;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -596,91 +597,104 @@ public abstract class BaseAppenderatorDriver implements 
Closeable
                                   ? null
                                   : ((AppenderatorDriverMetadata) 
metadata).getCallerMetadata();
     return executor.submit(
-        () -> {
-          try {
-            final ImmutableSet<DataSegment> ourSegments = 
ImmutableSet.copyOf(pushedAndTombstones);
-            final SegmentPublishResult publishResult = 
publisher.publishSegments(
-                segmentsToBeOverwritten,
-                ourSegments,
-                outputSegmentsAnnotateFunction,
-                callerMetadata
-            );
-
-            if (publishResult.isSuccess()) {
-              log.info(
-                  "Published [%s] segments with commit metadata [%s]",
-                  segmentsAndCommitMetadata.getSegments().size(),
-                  callerMetadata
-              );
-              log.infoSegments(segmentsAndCommitMetadata.getSegments(), 
"Published segments");
-            } else {
-              // Publishing didn't affirmatively succeed. However, segments 
with our identifiers may still be active
-              // now after all, for two possible reasons:
-              //
-              // 1) A replica may have beat us to publishing these segments. 
In this case we want to delete the
-              //    segments we pushed (if they had unique paths) to avoid 
wasting space on deep storage.
-              // 2) We may have actually succeeded, but not realized it due to 
missing the confirmation response
-              //    from the overlord. In this case we do not want to delete 
the segments we pushed, since they are
-              //    now live!
-
-              final Set<SegmentIdWithShardSpec> segmentsIdentifiers = 
segmentsAndCommitMetadata
-                  .getSegments()
-                  .stream()
-                  .map(SegmentIdWithShardSpec::fromDataSegment)
-                  .collect(Collectors.toSet());
-
-              final Set<DataSegment> activeSegments = 
usedSegmentChecker.findUsedSegments(segmentsIdentifiers);
-
-              if (activeSegments.equals(ourSegments)) {
-                log.info(
-                    "Could not publish [%s] segments, but checked and found 
them already published; continuing.",
-                    ourSegments.size()
-                );
-                log.infoSegments(
-                    segmentsAndCommitMetadata.getSegments(),
-                    "Could not publish segments"
+      () -> {
+        try {
+          RetryUtils.retry(
+              () -> {
+              try {
+                final ImmutableSet<DataSegment> ourSegments = 
ImmutableSet.copyOf(pushedAndTombstones);
+                final SegmentPublishResult publishResult = 
publisher.publishSegments(
+                    segmentsToBeOverwritten,
+                    ourSegments,
+                    outputSegmentsAnnotateFunction,
+                    callerMetadata
                 );
 
-                // Clean up pushed segments if they are physically disjoint 
from the published ones (this means
-                // they were probably pushed by a replica, and with the unique 
paths option).
-                final boolean physicallyDisjoint = Sets.intersection(
-                    
activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()),
-                    
ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet())
-                ).isEmpty();
-
-                if (physicallyDisjoint) {
-                  
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
-                }
-              } else {
-                // Our segments aren't active. Publish failed for some reason. 
Clean them up and then throw an error.
-                
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
-
-                if (publishResult.getErrorMsg() != null) {
-                  log.errorSegments(ourSegments, "Failed to publish segments");
-                  throw new ISE(
-                      "Failed to publish segments because of [%s]",
-                      publishResult.getErrorMsg()
+                if (publishResult.isSuccess()) {
+                  log.info(
+                      "Published [%s] segments with commit metadata [%s]",
+                      segmentsAndCommitMetadata.getSegments().size(),
+                      callerMetadata
                   );
+                  log.infoSegments(segmentsAndCommitMetadata.getSegments(), 
"Published segments");
                 } else {
-                  log.errorSegments(ourSegments, "Failed to publish segments");
-                  throw new ISE("Failed to publish segments");
+                  // Publishing didn't affirmatively succeed. However, 
segments with our identifiers may still be active
+                  // now after all, for two possible reasons:
+                  //
+                  // 1) A replica may have beat us to publishing these 
segments. In this case we want to delete the
+                  //    segments we pushed (if they had unique paths) to avoid 
wasting space on deep storage.
+                  // 2) We may have actually succeeded, but not realized it 
due to missing the confirmation response
+                  //    from the overlord. In this case we do not want to 
delete the segments we pushed, since they are
+                  //    now live!
+
+                  final Set<SegmentIdWithShardSpec> segmentsIdentifiers = 
segmentsAndCommitMetadata
+                      .getSegments()
+                      .stream()
+                      .map(SegmentIdWithShardSpec::fromDataSegment)
+                      .collect(Collectors.toSet());
+
+                  final Set<DataSegment> activeSegments = 
usedSegmentChecker.findUsedSegments(segmentsIdentifiers);
+
+                  if (activeSegments.equals(ourSegments)) {
+                    log.info(
+                        "Could not publish [%s] segments, but checked and 
found them already published; continuing.",
+                        ourSegments.size()
+                    );
+                    log.infoSegments(
+                        segmentsAndCommitMetadata.getSegments(),
+                        "Could not publish segments"
+                    );
+
+                    // Clean up pushed segments if they are physically 
disjoint from the published ones (this means
+                    // they were probably pushed by a replica, and with the 
unique paths option).
+                    final boolean physicallyDisjoint = Sets.intersection(
+                        
activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()),
+                        
ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet())
+                    ).isEmpty();
+
+                    if (physicallyDisjoint) {
+                      
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
+                    }
+                  } else {
+                    log.errorSegments(ourSegments, "Failed to publish 
segments");
+                    if (publishResult.getErrorMsg() != null && 
publishResult.getErrorMsg().contains(("Failed to update the metadata Store. The 
new start metadata is ahead of last commited end state."))) {
+                      throw new ISE(publishResult.getErrorMsg());
+                    }
+                    // Our segments aren't active. Publish failed for some 
reason. Clean them up and then throw an error.
+                    
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
+                    if (publishResult.getErrorMsg() != null) {
+                      throw new ISE("Failed to publish segments because of 
[%s]", publishResult.getErrorMsg());
+                    }
+                    throw new ISE("Failed to publish segments");
+                  }
                 }
               }
-            }
-          }
-          catch (Exception e) {
-            // Must not remove segments here, we aren't sure if our 
transaction succeeded or not.
-            log.noStackTrace().warn(e, "Failed publish");
-            log.warnSegments(
-                segmentsAndCommitMetadata.getSegments(),
-                "Failed publish, not removing segments"
-            );
-            Throwables.propagateIfPossible(e);
-            throw new RuntimeException(e);
+              catch (Exception e) {
+                // Must not remove segments here, we aren't sure if our 
transaction succeeded or not.
+                log.noStackTrace().warn(e, "Failed publish");
+                log.warnSegments(
+                    segmentsAndCommitMetadata.getSegments(),
+                    "Failed publish, not removing segments"
+                );
+                Throwables.propagateIfPossible(e);
+                throw new RuntimeException(e);
+              }
+              return segmentsAndCommitMetadata;
+            },
+              e -> (e.getMessage() != null && e.getMessage().contains("Failed 
to update the metadata Store. The new start metadata is ahead of last commited 
end state.")),
+              RetryUtils.DEFAULT_MAX_TRIES
+          );
+        }
+        catch (Exception e) {
+          if (e.getMessage() != null && e.getMessage().contains("Failed to 
update the metadata Store. The new start metadata is ahead of last commited end 
state.")) {
+            // Publish failed for some reason. Clean them up and then throw an 
error.
+            
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
           }
-
-          return segmentsAndCommitMetadata;
+          Throwables.propagateIfPossible(e);
+          throw new RuntimeException(e);
         }
+        return segmentsAndCommitMetadata;
+      }
     );
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 9d2b3d495c6..9132fb23630 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -936,10 +936,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         new ObjectMetadata(ImmutableMap.of("foo", "bar")),
         new ObjectMetadata(ImmutableMap.of("foo", "baz"))
     );
-    Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: 
Inconsistent metadata state. This can " +
-        "happen if you update input topic in a spec without changing the 
supervisor name. " +
-        "Stored state: [null], " +
-        "Target state: [ObjectMetadata{theObject={foo=bar}}]."), result1);
+    Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: 
Failed to update the metadata Store. The new start metadata is ahead of last 
commited end state."), result1);
 
     // Should only be tried once.
     Assert.assertEquals(1, metadataUpdateCounter.get());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to