This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch 30.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/30.0.0 by this push:
     new 607d4d219b3 [Backport] Do not allocate ids conflicting with existing 
segment ids (#16380) (#16383)
607d4d219b3 is described below

commit 607d4d219b34996dc9229c0ccfe2286c4e366393
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Sun May 5 09:01:20 2024 +0530

    [Backport] Do not allocate ids conflicting with existing segment ids 
(#16380) (#16383)
---
 .../common/actions/SegmentAllocateActionTest.java  | 52 ++++++++++++
 .../IndexerSQLMetadataStorageCoordinator.java      | 99 +++++++++++++++-------
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 55 ++++++++++++
 3 files changed, 174 insertions(+), 32 deletions(-)

diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
index f2da105d269..1857f6d67f7 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
@@ -29,6 +29,8 @@ import org.apache.druid.indexing.common.SegmentLock;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskLockbox;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
@@ -1062,6 +1064,45 @@ public class SegmentAllocateActionTest
     Assert.assertEquals(Duration.ofDays(1).toMillis(), 
id2.getInterval().toDurationMillis());
   }
 
+  @Test
+  public void testSegmentIdMustNotBeReused() throws IOException
+  {
+    final IndexerMetadataStorageCoordinator coordinator = 
taskActionTestKit.getMetadataStorageCoordinator();
+    final TaskLockbox lockbox = taskActionTestKit.getTaskLockbox();
+    final Task task0 = NoopTask.ofPriority(25);
+    lockbox.add(task0);
+    final NoopTask task1 = NoopTask.ofPriority(50);
+    lockbox.add(task1);
+
+    // Allocate and commit for older task task0
+    final SegmentIdWithShardSpec id0 =
+        allocate(task0, DateTimes.nowUtc(), Granularities.NONE, 
Granularities.ALL, "seq", "0");
+    final DataSegment dataSegment0 = getSegmentForIdentifier(id0);
+    coordinator.commitSegments(ImmutableSet.of(dataSegment0), null);
+    lockbox.unlock(task0, Intervals.ETERNITY);
+
+    // Allocate and commit for newer task task1. Pending segments are cleaned 
up
+    final SegmentIdWithShardSpec id1 =
+        allocate(task1, DateTimes.nowUtc(), Granularities.NONE, 
Granularities.ALL, "seq", "1");
+    final DataSegment dataSegment1 = getSegmentForIdentifier(id1);
+    final SegmentIdWithShardSpec id2 =
+        allocate(task1, DateTimes.nowUtc(), Granularities.NONE, 
Granularities.ALL, "seq", "2");
+    final DataSegment dataSegment2 = getSegmentForIdentifier(id2);
+    coordinator.commitSegments(ImmutableSet.of(dataSegment1, dataSegment2), 
null);
+    // Clean up pending segments corresponding to the last pending segment
+    coordinator.deletePendingSegmentsForTaskAllocatorId(task1.getDataSource(), 
task1.getTaskAllocatorId());
+
+    // Drop all segments
+    coordinator.markSegmentsAsUnusedWithinInterval(task0.getDataSource(), 
Intervals.ETERNITY);
+
+    // Allocate another id and ensure that it doesn't exist in the 
druid_segments table
+    final SegmentIdWithShardSpec theId =
+        allocate(task1, DateTimes.nowUtc(), Granularities.NONE, 
Granularities.ALL, "seq", "3");
+    
Assert.assertNull(coordinator.retrieveSegmentForId(theId.asSegmentId().toString(),
 true));
+
+    lockbox.unlock(task1, Intervals.ETERNITY);
+  }
+
   private SegmentIdWithShardSpec allocate(
       final Task task,
       final DateTime timestamp,
@@ -1123,4 +1164,15 @@ public class SegmentAllocateActionTest
     Assert.assertEquals(expected, actual);
     Assert.assertEquals(expected.getShardSpec(), actual.getShardSpec());
   }
+
+  private DataSegment getSegmentForIdentifier(SegmentIdWithShardSpec 
identifier)
+  {
+    return DataSegment.builder()
+                      .dataSource(identifier.getDataSource())
+                      .interval(identifier.getInterval())
+                      .version(identifier.getVersion())
+                      .shardSpec(identifier.getShardSpec())
+                      .size(100)
+                      .build();
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 618173a5db7..2b02f09926b 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -28,7 +28,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import com.google.common.io.BaseEncoding;
 import com.google.inject.Inject;
@@ -1013,33 +1012,6 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     return allocatedSegmentIds;
   }
 
-  @SuppressWarnings("UnstableApiUsage")
-  private String getSequenceNameAndPrevIdSha(
-      SegmentCreateRequest request,
-      SegmentIdWithShardSpec pendingSegmentId,
-      boolean skipSegmentLineageCheck
-  )
-  {
-    final Hasher hasher = Hashing.sha1().newHasher()
-                                 
.putBytes(StringUtils.toUtf8(request.getSequenceName()))
-                                 .putByte((byte) 0xff);
-
-    if (skipSegmentLineageCheck) {
-      final Interval interval = pendingSegmentId.getInterval();
-      hasher
-          .putLong(interval.getStartMillis())
-          .putLong(interval.getEndMillis());
-    } else {
-      hasher
-          .putBytes(StringUtils.toUtf8(request.getPreviousSegmentId()));
-    }
-
-    hasher.putByte((byte) 0xff);
-    hasher.putBytes(StringUtils.toUtf8(pendingSegmentId.getVersion()));
-
-    return BaseEncoding.base16().encode(hasher.hash().asBytes());
-  }
-
   @Nullable
   private SegmentIdWithShardSpec allocatePendingSegment(
       final Handle handle,
@@ -1727,7 +1699,6 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       // The number of core partitions must always be chosen from the set of 
used segments in the SegmentTimeline.
       // When the core partitions have been dropped, using pending segments 
may lead to an incorrect state
       // where the chunk is believed to have core partitions and queries 
results are incorrect.
-
       SegmentIdWithShardSpec pendingSegmentId = new SegmentIdWithShardSpec(
           dataSource,
           interval,
@@ -1739,7 +1710,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
           )
       );
       return new PendingSegmentRecord(
-          pendingSegmentId,
+          getTrueAllocatedId(pendingSegmentId),
           request.getSequenceName(),
           request.getPreviousSegmentId(),
           request.getUpgradedFromSegmentId(),
@@ -1875,8 +1846,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       // The number of core partitions must always be chosen from the set of 
used segments in the SegmentTimeline.
       // When the core partitions have been dropped, using pending segments 
may lead to an incorrect state
       // where the chunk is believed to have core partitions and queries 
results are incorrect.
-
-      return new SegmentIdWithShardSpec(
+      final SegmentIdWithShardSpec allocatedId = new SegmentIdWithShardSpec(
           dataSource,
           interval,
           Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"),
@@ -1886,7 +1856,72 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
               committedMaxId == null ? 0 : 
committedMaxId.getShardSpec().getNumCorePartitions()
           )
       );
+      return getTrueAllocatedId(allocatedId);
+    }
+  }
+
+  /**
+   * Verifies that the allocated id doesn't already exist in the druid 
segments table.
+   * If yes, try to get the max unallocated id considering the unused segments 
for the datasource, version and interval
+   * Otherwise, use the same id.
+   * @param allocatedId The segment allcoted on the basis of used and pending 
segments
+   * @return a segment id that isn't already used by other unused segments
+   */
+  private SegmentIdWithShardSpec getTrueAllocatedId(SegmentIdWithShardSpec 
allocatedId)
+  {
+    // Check if there is a conflict with an existing entry in the segments 
table
+    if (retrieveSegmentForId(allocatedId.asSegmentId().toString(), true) == 
null) {
+      return allocatedId;
+    }
+
+    // If yes, try to compute allocated partition num using the max unused 
segment shard spec
+    SegmentIdWithShardSpec unusedMaxId = getUnusedMaxId(
+        allocatedId.getDataSource(),
+        allocatedId.getInterval(),
+        allocatedId.getVersion()
+    );
+    // No unused segment. Just return the allocated id
+    if (unusedMaxId == null) {
+      return allocatedId;
+    }
+
+    int maxPartitionNum = Math.max(
+        allocatedId.getShardSpec().getPartitionNum(),
+        unusedMaxId.getShardSpec().getPartitionNum() + 1
+    );
+    return new SegmentIdWithShardSpec(
+        allocatedId.getDataSource(),
+        allocatedId.getInterval(),
+        allocatedId.getVersion(),
+        new NumberedShardSpec(
+            maxPartitionNum,
+            allocatedId.getShardSpec().getNumCorePartitions()
+        )
+    );
+  }
+
+  private SegmentIdWithShardSpec getUnusedMaxId(String datasource, Interval 
interval, String version)
+  {
+    List<DataSegment> unusedSegments = retrieveUnusedSegmentsForInterval(
+        datasource,
+        interval,
+        ImmutableList.of(version),
+        null,
+        null
+    );
+
+    SegmentIdWithShardSpec unusedMaxId = null;
+    int maxPartitionNum = -1;
+    for (DataSegment unusedSegment : unusedSegments) {
+      if (unusedSegment.getInterval().equals(interval)) {
+        int partitionNum = unusedSegment.getShardSpec().getPartitionNum();
+        if (maxPartitionNum < partitionNum) {
+          maxPartitionNum = partitionNum;
+          unusedMaxId = SegmentIdWithShardSpec.fromDataSegment(unusedSegment);
+        }
+      }
     }
+    return unusedMaxId;
   }
 
   @Override
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 29b7f92e8e5..fc43d7126fe 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -3192,4 +3192,59 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     SegmentTimeline segmentTimeline = 
SegmentTimeline.forSegments(allUsedSegments);
     Assert.assertEquals(0, segmentTimeline.lookup(interval).size());
   }
+
+  @Test
+  public void testSegmentIdShouldNotBeReallocated() throws IOException
+  {
+    final SegmentIdWithShardSpec idWithNullTaskAllocator = 
coordinator.allocatePendingSegment(
+        DS.WIKI,
+        "seq",
+        "0",
+        Intervals.ETERNITY,
+        NumberedPartialShardSpec.instance(),
+        "version",
+        false,
+        null
+    );
+    final DataSegment dataSegment0 = createSegment(
+        idWithNullTaskAllocator.getInterval(),
+        idWithNullTaskAllocator.getVersion(),
+        idWithNullTaskAllocator.getShardSpec()
+    );
+
+    final SegmentIdWithShardSpec idWithValidTaskAllocator = 
coordinator.allocatePendingSegment(
+        DS.WIKI,
+        "seq",
+        "1",
+        Intervals.ETERNITY,
+        NumberedPartialShardSpec.instance(),
+        "version",
+        false,
+        "taskAllocatorId"
+    );
+    final DataSegment dataSegment1 = createSegment(
+        idWithValidTaskAllocator.getInterval(),
+        idWithValidTaskAllocator.getVersion(),
+        idWithValidTaskAllocator.getShardSpec()
+    );
+
+    // Insert pending segments
+    coordinator.commitSegments(ImmutableSet.of(dataSegment0, dataSegment1), 
null);
+    // Clean up pending segments corresponding to the valid task allocator id
+    coordinator.deletePendingSegmentsForTaskAllocatorId(DS.WIKI, 
"taskAllocatorId");
+    // Mark all segments as unused
+    coordinator.markSegmentsAsUnusedWithinInterval(DS.WIKI, 
Intervals.ETERNITY);
+
+    final SegmentIdWithShardSpec theId = coordinator.allocatePendingSegment(
+        DS.WIKI,
+        "seq",
+        "2",
+        Intervals.ETERNITY,
+        NumberedPartialShardSpec.instance(),
+        "version",
+        false,
+        "taskAllocatorId"
+    );
+    
Assert.assertNull(coordinator.retrieveSegmentForId(theId.asSegmentId().toString(),
 true));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to