This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new acd3b291c5a Fix concurrent append to interval with only unused
segments (#18216)
acd3b291c5a is described below
commit acd3b291c5a6a10c2c9a52259d5f4fa7e5a3317d
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Jul 8 22:43:41 2025 +0530
Fix concurrent append to interval with only unused segments (#18216)
Bug:
Concurrent append uses lock of type APPEND which always uses a lock version
of epoch 1970-01-01.
This can cause data loss in a flow as follows:
- Ingest data using an APPEND task to an empty interval
- Mark all the segments as unused
- Re-run the APPEND task
- Data is not visible since old segment IDs (now unused) are allocated again
Fix:
In segment allocation, do not reuse an old segment ID, used or unused.
This fix was already done for some cases back in #16380 .
An embedded test for this has been included in #18207
---
.../concurrent/ConcurrentReplaceAndAppendTest.java | 32 ++++++++++++++++++++++
.../IndexerSQLMetadataStorageCoordinator.java | 4 ++-
2 files changed, 35 insertions(+), 1 deletion(-)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index 76cbe210611..d052fd65489 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -1119,6 +1119,38 @@ public class ConcurrentReplaceAndAppendTest extends
IngestionTestBase
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11,
segmentV12);
}
+ @Test
+ public void test_concurrentAppend_toIntervalWithUnusedSegments()
+ {
+ // Allocate and commit an APPEND segment
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(),
Granularities.DAY);
+ Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+ Assert.assertEquals(0, pendingSegment.getShardSpec().getPartitionNum());
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ appendTask.commitAppendSegments(segmentV01);
+
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
+
+ // Mark it as unused
+
getStorageCoordinator().markAllSegmentsAsUnused(appendTask.getDataSource());
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23);
+
+ // Allocate and commit another APPEND segment
+ final SegmentIdWithShardSpec pendingSegment2
+ = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(),
Granularities.DAY);
+ Assert.assertEquals(SEGMENT_V0, pendingSegment2.getVersion());
+ Assert.assertEquals(1, pendingSegment2.getShardSpec().getPartitionNum());
+
+ final DataSegment segmentV02 = asSegment(pendingSegment2);
+ appendTask.commitAppendSegments(segmentV02);
+ Assert.assertNotEquals(segmentV01, segmentV02);
+
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV02);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV02);
+ }
@Nullable
private DataSegment findSegmentWith(String version, Map<String, Object>
loadSpec, Set<DataSegment> segments)
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 5f6f817313e..dade1668649 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -1419,6 +1419,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
version,
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
);
+ pendingSegmentId = getTrueAllocatedId(transaction, pendingSegmentId);
return PendingSegmentRecord.create(
pendingSegmentId,
request.getSequenceName(),
@@ -1555,12 +1556,13 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID
: PartitionIds.ROOT_GEN_START_PARTITION_ID;
String version = newSegmentVersion == null ? existingVersion :
newSegmentVersion;
- return new SegmentIdWithShardSpec(
+ SegmentIdWithShardSpec allocatedId = new SegmentIdWithShardSpec(
dataSource,
interval,
version,
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
);
+ return getTrueAllocatedId(transaction, allocatedId);
} else if (!overallMaxId.getInterval().equals(interval)) {
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s],
existingVersion[%s]: conflicting segment[%s].",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]