This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new acd3b291c5a Fix concurrent append to interval with only unused 
segments (#18216)
acd3b291c5a is described below

commit acd3b291c5a6a10c2c9a52259d5f4fa7e5a3317d
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Jul 8 22:43:41 2025 +0530

    Fix concurrent append to interval with only unused segments (#18216)
    
    Bug:
    Concurrent append uses lock of type APPEND which always uses a lock version 
of epoch 1970-01-01.
    
    This can cause data loss in a flow as follows:
    - Ingest data using an APPEND task to an empty interval
    - Mark all the segments as unused
    - Re-run the APPEND task
    - Data is not visible since old segment IDs (now unused) are allocated again
    
    Fix:
    In segment allocation, do not reuse an old segment ID, used or unused.
    This fix was already done for some cases back in #16380 .
    An embedded test for this has been included in #18207
---
 .../concurrent/ConcurrentReplaceAndAppendTest.java | 32 ++++++++++++++++++++++
 .../IndexerSQLMetadataStorageCoordinator.java      |  4 ++-
 2 files changed, 35 insertions(+), 1 deletion(-)

diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index 76cbe210611..d052fd65489 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -1119,6 +1119,38 @@ public class ConcurrentReplaceAndAppendTest extends 
IngestionTestBase
     verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11, 
segmentV12);
   }
 
+  @Test
+  public void test_concurrentAppend_toIntervalWithUnusedSegments()
+  {
+    // Allocate and commit an APPEND segment
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), 
Granularities.DAY);
+    Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+    Assert.assertEquals(0, pendingSegment.getShardSpec().getPartitionNum());
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    appendTask.commitAppendSegments(segmentV01);
+
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
+
+    // Mark it as unused
+    
getStorageCoordinator().markAllSegmentsAsUnused(appendTask.getDataSource());
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23);
+
+    // Allocate and commit another APPEND segment
+    final SegmentIdWithShardSpec pendingSegment2
+        = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), 
Granularities.DAY);
+    Assert.assertEquals(SEGMENT_V0, pendingSegment2.getVersion());
+    Assert.assertEquals(1, pendingSegment2.getShardSpec().getPartitionNum());
+
+    final DataSegment segmentV02 = asSegment(pendingSegment2);
+    appendTask.commitAppendSegments(segmentV02);
+    Assert.assertNotEquals(segmentV01, segmentV02);
+
+    verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV02);
+    verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV02);
+  }
 
   @Nullable
   private DataSegment findSegmentWith(String version, Map<String, Object> 
loadSpec, Set<DataSegment> segments)
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 5f6f817313e..dade1668649 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -1419,6 +1419,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
           version,
           partialShardSpec.complete(jsonMapper, newPartitionId, 0)
       );
+      pendingSegmentId = getTrueAllocatedId(transaction, pendingSegmentId);
       return PendingSegmentRecord.create(
           pendingSegmentId,
           request.getSequenceName(),
@@ -1555,12 +1556,13 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                                  ? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID
                                  : PartitionIds.ROOT_GEN_START_PARTITION_ID;
       String version = newSegmentVersion == null ? existingVersion : 
newSegmentVersion;
-      return new SegmentIdWithShardSpec(
+      SegmentIdWithShardSpec allocatedId = new SegmentIdWithShardSpec(
           dataSource,
           interval,
           version,
           partialShardSpec.complete(jsonMapper, newPartitionId, 0)
       );
+      return getTrueAllocatedId(transaction, allocatedId);
     } else if (!overallMaxId.getInterval().equals(interval)) {
       log.warn(
           "Cannot allocate new segment for dataSource[%s], interval[%s], 
existingVersion[%s]: conflicting segment[%s].",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to