This is an automated email from the ASF dual-hosted git repository.

capistrant pushed a commit to branch 34.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/34.0.0 by this push:
     new ae252d9f015 Fix concurrent append to interval with unused segments 
(#18230) (#18275)
ae252d9f015 is described below

commit ae252d9f0154ab4546a8d61a4e977231f133021b
Author: Lucas Capistrant <capistr...@users.noreply.github.com>
AuthorDate: Thu Jul 17 12:23:43 2025 -0500

    Fix concurrent append to interval with unused segments (#18230) (#18275)
    
    This is a better approach to the fix in #18216
    
    Changes:
    - When allocating the first segment in an interval which already contains 
an unused segment,
    use a fresh version rather than reusing the old version (now unused)
    
    Co-authored-by: Kashif Faraz <kashif.fa...@gmail.com>
---
 .../concurrent/ConcurrentReplaceAndAppendTest.java |  59 +++++++-
 .../org/apache/druid/error/ExceptionMatcher.java   |   7 +
 .../IndexerSQLMetadataStorageCoordinator.java      | 101 +++++++++++--
 .../druid/metadata/PendingSegmentRecord.java       |  13 ++
 .../druid/metadata/SqlSegmentsMetadataQuery.java   |  23 +++
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 168 +++++++++++++++++++++
 6 files changed, 357 insertions(+), 14 deletions(-)

diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index d052fd65489..be2d351f1c5 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -23,6 +23,8 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.error.ExceptionMatcher;
 import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskStorageDirTracker;
@@ -63,6 +65,7 @@ import org.apache.druid.tasklogs.NoopTaskLogs;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.hamcrest.MatcherAssert;
 import org.joda.time.Interval;
 import org.joda.time.Period;
 import org.junit.After;
@@ -1120,7 +1123,7 @@ public class ConcurrentReplaceAndAppendTest extends 
IngestionTestBase
   }
 
   @Test
-  public void test_concurrentAppend_toIntervalWithUnusedSegments()
+  public void 
test_concurrentAppend_toIntervalWithUnusedAppendSegment_createsFreshVersion()
   {
     // Allocate and commit an APPEND segment
     final SegmentIdWithShardSpec pendingSegment
@@ -1141,8 +1144,10 @@ public class ConcurrentReplaceAndAppendTest extends 
IngestionTestBase
     // Allocate and commit another APPEND segment
     final SegmentIdWithShardSpec pendingSegment2
         = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), 
Granularities.DAY);
-    Assert.assertEquals(SEGMENT_V0, pendingSegment2.getVersion());
-    Assert.assertEquals(1, pendingSegment2.getShardSpec().getPartitionNum());
+
+    // Verify that the new segment gets a different version
+    Assert.assertEquals(SEGMENT_V0 + "S", pendingSegment2.getVersion());
+    Assert.assertEquals(0, pendingSegment2.getShardSpec().getPartitionNum());
 
     final DataSegment segmentV02 = asSegment(pendingSegment2);
     appendTask.commitAppendSegments(segmentV02);
@@ -1152,6 +1157,54 @@ public class ConcurrentReplaceAndAppendTest extends 
IngestionTestBase
     verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV02);
   }
 
+  @Test
+  public void 
test_allocateCommitDelete_createsFreshVersion_uptoMaxAllowedRetries()
+  {
+    final int maxAllowedAppends = 10;
+    final int expectedParitionNum = 0;
+    String expectedVersion = SEGMENT_V0;
+
+    // Allocate, commit, delete, repeat
+    for (int i = 0; i < maxAllowedAppends; ++i, expectedVersion += "S") {
+      // Allocate a segment and verify its version and partition number
+      final SegmentIdWithShardSpec pendingSegment
+          = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), 
Granularities.DAY);
+
+      Assert.assertEquals(expectedVersion, pendingSegment.getVersion());
+      Assert.assertEquals(expectedParitionNum, 
pendingSegment.getShardSpec().getPartitionNum());
+
+      // Commit the segment and verify its version and partition number
+      final DataSegment segment = asSegment(pendingSegment);
+      appendTask.commitAppendSegments(segment);
+
+      Assert.assertEquals(expectedVersion, segment.getVersion());
+      Assert.assertEquals(expectedParitionNum, 
segment.getShardSpec().getPartitionNum());
+
+      verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segment);
+      verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segment);
+
+      // Mark the segment as unused
+      
getStorageCoordinator().markAllSegmentsAsUnused(appendTask.getDataSource());
+      verifyIntervalHasUsedSegments(FIRST_OF_JAN_23);
+    }
+
+    // Verify that the next attempt fails
+    MatcherAssert.assertThat(
+        Assert.assertThrows(
+            ISE.class,
+            () -> 
appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), 
Granularities.DAY)
+        ),
+        ExceptionMatcher.of(ISE.class).expectRootCause(
+            DruidExceptionMatcher.internalServerError().expectMessageIs(
+                "Could not allocate segment"
+                + 
"[wiki_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_1970-01-01T00:00:00.000Z]"
+                + " as there are too many clashing unused versions(upto 
[1970-01-01T00:00:00.000ZSSSSSSSSSS])"
+                + " in the interval. Kill the old unused versions to proceed."
+            )
+        )
+    );
+  }
+
   @Nullable
   private DataSegment findSegmentWith(String version, Map<String, Object> 
loadSpec, Set<DataSegment> segments)
   {
diff --git 
a/processing/src/test/java/org/apache/druid/error/ExceptionMatcher.java 
b/processing/src/test/java/org/apache/druid/error/ExceptionMatcher.java
index 31195093254..5bc16c47f27 100644
--- a/processing/src/test/java/org/apache/druid/error/ExceptionMatcher.java
+++ b/processing/src/test/java/org/apache/druid/error/ExceptionMatcher.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.error;
 
+import com.google.common.base.Throwables;
 import org.apache.druid.matchers.DruidMatchers;
 import org.hamcrest.Description;
 import org.hamcrest.DiagnosingMatcher;
@@ -75,6 +76,12 @@ public class ExceptionMatcher extends 
DiagnosingMatcher<Throwable>
     return this;
   }
 
+  public ExceptionMatcher expectRootCause(Matcher<Throwable> causeMatcher)
+  {
+    matcherList.add(0, DruidMatchers.fn("rootCause", Throwables::getRootCause, 
causeMatcher));
+    return this;
+  }
+
   @Override
   protected boolean matches(Object item, Description mismatchDescription)
   {
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index dade1668649..a2c5956772f 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -33,6 +33,7 @@ import com.google.common.io.BaseEncoding;
 import com.google.inject.Inject;
 import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InternalServerError;
 import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -1419,7 +1420,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
           version,
           partialShardSpec.complete(jsonMapper, newPartitionId, 0)
       );
-      pendingSegmentId = getTrueAllocatedId(transaction, pendingSegmentId);
+      pendingSegmentId = getUniqueIdForPrimaryAllocation(transaction, 
pendingSegmentId);
       return PendingSegmentRecord.create(
           pendingSegmentId,
           request.getSequenceName(),
@@ -1457,7 +1458,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
           )
       );
       return PendingSegmentRecord.create(
-          getTrueAllocatedId(transaction, pendingSegmentId),
+          getUniqueIdForSecondaryAllocation(transaction, pendingSegmentId),
           request.getSequenceName(),
           request.getPreviousSegmentId(),
           null,
@@ -1562,7 +1563,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
           version,
           partialShardSpec.complete(jsonMapper, newPartitionId, 0)
       );
-      return getTrueAllocatedId(transaction, allocatedId);
+      return getUniqueIdForPrimaryAllocation(transaction, allocatedId);
     } else if (!overallMaxId.getInterval().equals(interval)) {
       log.warn(
           "Cannot allocate new segment for dataSource[%s], interval[%s], 
existingVersion[%s]: conflicting segment[%s].",
@@ -1590,18 +1591,90 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
               committedMaxId == null ? 0 : 
committedMaxId.getShardSpec().getNumCorePartitions()
           )
       );
-      return getTrueAllocatedId(transaction, allocatedId);
+      return getUniqueIdForSecondaryAllocation(transaction, allocatedId);
     }
   }
 
   /**
-   * Verifies that the allocated id doesn't already exist in the 
druid_segments table.
-   * If yes, try to get the max unallocated id considering the unused segments 
for the datasource, version and interval
-   * Otherwise, use the same id.
-   * @param allocatedId The segment allcoted on the basis of used and pending 
segments
-   * @return a segment id that isn't already used by other unused segments
+   * Returns a unique {@link SegmentIdWithShardSpec} which does not clash with
+   * any existing unused segment. If an unused segment already exists that 
matches
+   * the interval and version of the given {@code allocatedId}, a fresh version
+   * is created by suffixing one or more {@link 
PendingSegmentRecord#CONCURRENT_APPEND_VERSION_SUFFIX}.
+   * Such a conflict can happen only if all the segments in this interval 
created
+   * by a prior APPEND task were marked as unused.
+   * <p>
+   * This method should be called only when allocating the first segment in an 
interval.
+   */
+  private SegmentIdWithShardSpec getUniqueIdForPrimaryAllocation(
+      SegmentMetadataTransaction transaction,
+      SegmentIdWithShardSpec allocatedId
+  )
+  {
+    // Get all the unused segment versions for this datasource and interval
+    final Set<String> unusedSegmentVersions = 
transaction.noCacheSql().retrieveUnusedSegmentVersionsWithInterval(
+        allocatedId.getDataSource(),
+        allocatedId.getInterval()
+    );
+
+    final String allocatedVersion = allocatedId.getVersion();
+    if (!unusedSegmentVersions.contains(allocatedVersion)) {
+      // Nothing to do, this version is new
+      return allocatedId;
+    } else if 
(!PendingSegmentRecord.DEFAULT_VERSION_FOR_CONCURRENT_APPEND.equals(allocatedVersion))
 {
+      // Version clash should never happen for non-APPEND locks
+      throw DruidException.defensive(
+          "Cannot allocate segment[%s] as there are already some unused 
segments"
+          + " for version[%s] in this interval.",
+          allocatedId, allocatedVersion
+      );
+    }
+
+    // Iterate until a new non-clashing version is found
+    boolean foundFreshVersion = false;
+    StringBuilder candidateVersion = new 
StringBuilder(allocatedId.getVersion());
+    for (int i = 0; i < 10; ++i) {
+      if (unusedSegmentVersions.contains(candidateVersion.toString())) {
+        
candidateVersion.append(PendingSegmentRecord.CONCURRENT_APPEND_VERSION_SUFFIX);
+      } else {
+        foundFreshVersion = true;
+        break;
+      }
+    }
+
+    if (foundFreshVersion) {
+      final SegmentIdWithShardSpec uniqueId = new SegmentIdWithShardSpec(
+          allocatedId.getDataSource(),
+          allocatedId.getInterval(),
+          candidateVersion.toString(),
+          allocatedId.getShardSpec()
+      );
+      log.info(
+          "Created new unique pending segment ID[%s] with version[%s] for 
originally allocated ID[%s].",
+          uniqueId, candidateVersion.toString(), allocatedId
+      );
+
+      return uniqueId;
+    } else {
+      throw InternalServerError.exception(
+          "Could not allocate segment[%s] as there are too many clashing 
unused"
+          + " versions(upto [%s]) in the interval. Kill the old unused 
versions to proceed.",
+          allocatedId, candidateVersion.toString()
+      );
+    }
+  }
+
+  /**
+   * Returns a unique {@link SegmentIdWithShardSpec} which does not clash with
+   * any existing unused segment. If an unused segment already exists that 
matches
+   * the interval, version and partition number of the given {@code 
allocatedId},
+   * a higher partition number is used. Such a conflict can happen only if some
+   * segments of the underlying version have been marked as unused while others
+   * are still used.
+   * <p>
+   * This method should not be called when allocating the first segment in an
+   * interval.
    */
-  private SegmentIdWithShardSpec getTrueAllocatedId(
+  private SegmentIdWithShardSpec getUniqueIdForSecondaryAllocation(
       SegmentMetadataTransaction transaction,
       SegmentIdWithShardSpec allocatedId
   )
@@ -1631,7 +1704,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
         allocatedId.getShardSpec().getPartitionNum(),
         unusedMaxId.getPartitionNum() + 1
     );
-    return new SegmentIdWithShardSpec(
+    final SegmentIdWithShardSpec uniqueId = new SegmentIdWithShardSpec(
         allocatedId.getDataSource(),
         allocatedId.getInterval(),
         allocatedId.getVersion(),
@@ -1640,6 +1713,12 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
             allocatedId.getShardSpec().getNumCorePartitions()
         )
     );
+    log.info(
+        "Created new unique pending segment ID[%s] with partition number[%s] 
for originally allocated ID[%s].",
+        uniqueId, maxPartitionNum, allocatedId
+    );
+
+    return uniqueId;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java 
b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
index 47de746d917..b76160ab9fb 100644
--- a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
+++ b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
@@ -52,6 +52,19 @@ import java.sql.ResultSet;
  */
 public class PendingSegmentRecord
 {
+  /**
+   * Default lock version used by concurrent APPEND tasks.
+   */
+  public static final String DEFAULT_VERSION_FOR_CONCURRENT_APPEND = 
DateTimes.EPOCH.toString();
+
+  /**
+   * Suffix to use to construct fresh segment versions in the event of a clash.
+   * The chosen character {@code S} is just for visual ease so that two 
versions
+   * are not easily confused for each other.
+   * {@code 1970-01-01T00:00:00.000Z_1} vs {@code 1970-01-01T00:00:00.000ZS_1}.
+   */
+  public static final String CONCURRENT_APPEND_VERSION_SUFFIX = "S";
+
   private final SegmentIdWithShardSpec id;
   private final String sequenceName;
   private final String sequencePrevId;
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index 2779c16da17..0eba529c7b4 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -1128,6 +1128,29 @@ public class SqlSegmentsMetadataQuery
     return 
segments.stream().filter(Objects::nonNull).collect(Collectors.toList());
   }
 
+  /**
+   * Retrieves the versions of unused segments which are perfectly aligned with
+   * the given interval.
+   */
+  public Set<String> retrieveUnusedSegmentVersionsWithInterval(String 
dataSource, Interval interval)
+  {
+    final String sql = StringUtils.format(
+        "SELECT DISTINCT(version) FROM %1$s"
+        + " WHERE dataSource = :dataSource AND used = false"
+        + " AND %2$send%2$s = :end AND start = :start",
+        dbTables.getSegmentsTable(),
+        connector.getQuoteString()
+    );
+    return Set.copyOf(
+        handle.createQuery(sql)
+              .bind("dataSource", dataSource)
+              .bind("start", interval.getStart().toString())
+              .bind("end", interval.getEnd().toString())
+              .mapTo(String.class)
+              .list()
+    );
+  }
+
   /**
    * Retrieve the used segment for a given id if it exists in the metadata 
store and null otherwise
    */
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 11dda468a5d..c9ee0e97fe7 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -23,7 +23,9 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.error.DruidExceptionMatcher;
 import org.apache.druid.error.ExceptionMatcher;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.ObjectMetadata;
@@ -80,6 +82,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.skife.jdbi.v2.exceptions.CallbackFailedException;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -3123,6 +3126,112 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", 
identifier.toString());
   }
 
+  @Test
+  public void 
test_concurrentAppend_toIntervalWithUnusedAppendSegment_createsFreshVersion()
+  {
+    final String wiki = TestDataSource.WIKI;
+    final String appendLockVersion = 
PendingSegmentRecord.DEFAULT_VERSION_FOR_CONCURRENT_APPEND;
+    final Interval firstOfJan23 = Intervals.of("2023-01-01/P1D");
+
+    // Allocate and commit an APPEND segment
+    final String taskAllocator1 = "taskAlloc1";
+    final SegmentIdWithShardSpec pendingSegment
+        = allocatePendingSegmentForAppendTask(wiki, firstOfJan23, 
taskAllocator1);
+
+    Assert.assertNotNull(pendingSegment);
+    Assert.assertEquals(appendLockVersion, pendingSegment.getVersion());
+    Assert.assertEquals(0, pendingSegment.getShardSpec().getPartitionNum());
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    coordinator.commitAppendSegments(Set.of(segmentV01), Map.of(), 
taskAllocator1, null);
+
+    verifyIntervalHasUsedSegments(wiki, firstOfJan23, segmentV01);
+    verifyIntervalHasVisibleSegments(wiki, firstOfJan23, segmentV01);
+
+    // Mark the segment as unused with a future update time to avoid race 
conditions
+    final DateTime markUnusedTime = DateTimes.nowUtc().plusHours(1);
+    transactionFactory.inReadWriteDatasourceTransaction(
+        wiki,
+        t -> t.markAllSegmentsAsUnused(markUnusedTime)
+    );
+    verifyIntervalHasUsedSegments(wiki, firstOfJan23);
+
+    // Allocate and commit another APPEND segment
+    final String taskAllocator2 = "taskAlloc2";
+    final SegmentIdWithShardSpec pendingSegment2
+        = allocatePendingSegmentForAppendTask(wiki, firstOfJan23, 
taskAllocator2);
+
+    // Verify that the new segment gets a different version
+    Assert.assertNotNull(pendingSegment2);
+    Assert.assertEquals(appendLockVersion + "S", pendingSegment2.getVersion());
+    Assert.assertEquals(0, pendingSegment2.getShardSpec().getPartitionNum());
+
+    final DataSegment segmentV02 = asSegment(pendingSegment2);
+    coordinator.commitAppendSegments(Set.of(segmentV02), Map.of(), 
taskAllocator2, null);
+    Assert.assertNotEquals(segmentV01, segmentV02);
+
+    verifyIntervalHasUsedSegments(wiki, firstOfJan23, segmentV02);
+    verifyIntervalHasVisibleSegments(wiki, firstOfJan23, segmentV02);
+  }
+
+  @Test
+  public void 
test_allocateCommitDelete_createsFreshVersion_uptoMaxAllowedRetries()
+  {
+    final String wiki = TestDataSource.WIKI;
+    final Interval firstOfJan23 = Intervals.of("2023-01-01/P1D");
+
+    final int maxAllowedAppends = 10;
+    final int expectedParitionNum = 0;
+
+    String expectedVersion = DateTimes.EPOCH.toString();
+
+    // Allocate, commit, delete, repeat
+    for (int i = 0; i < maxAllowedAppends; ++i, expectedVersion += "S") {
+      // Allocate a segment and verify its version and partition number
+      final String taskAllocatorId = IdUtils.getRandomId();
+      final SegmentIdWithShardSpec pendingSegment
+          = allocatePendingSegmentForAppendTask(wiki, firstOfJan23, 
taskAllocatorId);
+
+      Assert.assertNotNull(pendingSegment);
+      Assert.assertEquals(expectedVersion, pendingSegment.getVersion());
+      Assert.assertEquals(expectedParitionNum, 
pendingSegment.getShardSpec().getPartitionNum());
+
+      // Commit the segment and verify its version and partition number
+      final DataSegment segment = asSegment(pendingSegment);
+      coordinator.commitAppendSegments(Set.of(segment), Map.of(), 
taskAllocatorId, null);
+
+      Assert.assertEquals(expectedVersion, segment.getVersion());
+      Assert.assertEquals(expectedParitionNum, 
segment.getShardSpec().getPartitionNum());
+
+      verifyIntervalHasUsedSegments(wiki, firstOfJan23, segment);
+      verifyIntervalHasVisibleSegments(wiki, firstOfJan23, segment);
+
+      // Mark the segment as unused with a future update time to avoid race 
conditions
+      final DateTime markUnusedTime = DateTimes.nowUtc().plusHours(1);
+      transactionFactory.inReadWriteDatasourceTransaction(
+          wiki,
+          t -> t.markAllSegmentsAsUnused(markUnusedTime)
+      );
+      verifyIntervalHasUsedSegments(wiki, firstOfJan23);
+    }
+
+    // Verify that the next attempt fails
+    MatcherAssert.assertThat(
+        Assert.assertThrows(
+            CallbackFailedException.class,
+            () -> allocatePendingSegmentForAppendTask(wiki, firstOfJan23, 
IdUtils.getRandomId())
+        ),
+        ExceptionMatcher.of(CallbackFailedException.class).expectRootCause(
+            DruidExceptionMatcher.internalServerError().expectMessageIs(
+                "Could not allocate segment"
+                + 
"[wiki_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_1970-01-01T00:00:00.000Z]"
+                + " as there are too many clashing unused versions(upto 
[1970-01-01T00:00:00.000ZSSSSSSSSSS])"
+                + " in the interval. Kill the old unused versions to proceed."
+            )
+        )
+    );
+  }
+
   @Test
   public void testDeletePendingSegment() throws InterruptedException
   {
@@ -4231,6 +4340,26 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     );
   }
 
+  private SegmentIdWithShardSpec allocatePendingSegmentForAppendTask(
+      String dataSource,
+      Interval interval,
+      String taskAllocatorId
+  )
+  {
+    return coordinator.allocatePendingSegment(
+        dataSource,
+        interval,
+        true,
+        new SegmentCreateRequest(
+            IdUtils.getRandomId(),
+            null,
+            PendingSegmentRecord.DEFAULT_VERSION_FOR_CONCURRENT_APPEND,
+            NumberedPartialShardSpec.instance(),
+            taskAllocatorId
+        )
+    );
+  }
+
   private int insertPendingSegments(
       String dataSource,
       List<PendingSegmentRecord> pendingSegments,
@@ -4247,4 +4376,43 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
   {
     insertUsedSegments(segments, upgradedFromSegmentIdMap, derbyConnectorRule, 
mapper);
   }
+
+  private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment)
+  {
+    final SegmentId id = pendingSegment.asSegmentId();
+    return new DataSegment(
+        id,
+        Map.of(id.toString(), id.toString()),
+        List.of(),
+        List.of(),
+        pendingSegment.getShardSpec(),
+        null,
+        0,
+        0
+    );
+  }
+
+  private void verifyIntervalHasUsedSegments(
+      String dataSource,
+      Interval interval,
+      DataSegment... expectedSegments
+  )
+  {
+    Assert.assertEquals(
+        Set.of(expectedSegments),
+        coordinator.retrieveUsedSegmentsForIntervals(dataSource, 
List.of(interval), Segments.INCLUDING_OVERSHADOWED)
+    );
+  }
+
+  private void verifyIntervalHasVisibleSegments(
+      String dataSource,
+      Interval interval,
+      DataSegment... expectedSegments
+  )
+  {
+    Assert.assertEquals(
+        Set.of(expectedSegments),
+        coordinator.retrieveUsedSegmentsForIntervals(dataSource, 
List.of(interval), Segments.ONLY_VISIBLE)
+    );
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to