This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f42ecc9f256 Fail concurrent replace tasks with finer segment
granularity than append (#17265)
f42ecc9f256 is described below
commit f42ecc9f25675d7ac1c5c759749ba9e074f4f40e
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Tue Oct 8 07:35:13 2024 +0530
Fail concurrent replace tasks with finer segment granularity than append
(#17265)
---
.../concurrent/ConcurrentReplaceAndAppendTest.java | 182 +++++++++++++++++++++
.../IndexerSQLMetadataStorageCoordinator.java | 25 ++-
.../IndexerSQLMetadataStorageCoordinatorTest.java | 67 ++++++++
3 files changed, 269 insertions(+), 5 deletions(-)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index 96c72e7130d..c74177e2c38 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -103,6 +103,9 @@ public class ConcurrentReplaceAndAppendTest extends
IngestionTestBase
private static final Interval YEAR_23 = Intervals.of("2023/2024");
private static final Interval JAN_23 = Intervals.of("2023-01/2023-02");
private static final Interval DEC_23 = Intervals.of("2023-12/2024-01");
+ private static final Interval JAN_FEB_MAR_23 =
Intervals.of("2023-01-01/2023-04-01");
+ private static final Interval APR_MAY_JUN_23 =
Intervals.of("2023-04-01/2023-07-01");
+ private static final Interval JUL_AUG_SEP_23 =
Intervals.of("2023-07-01/2023-10-01");
private static final Interval OCT_NOV_DEC_23 =
Intervals.of("2023-10-01/2024-01-01");
private static final Interval FIRST_OF_JAN_23 =
Intervals.of("2023-01-01/2023-01-02");
@@ -599,6 +602,185 @@ public class ConcurrentReplaceAndAppendTest extends
IngestionTestBase
verifyIntervalHasVisibleSegments(JAN_23, segmentV01);
}
+ @Test
+ public void testLockReplaceQuarterAllocateAppendYear()
+ {
+ final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
+ Assert.assertNotNull(replaceLock);
+
+ final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23,
replaceLock.getVersion());
+ final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23,
replaceLock.getVersion());
+ final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23,
replaceLock.getVersion());
+ final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23,
replaceLock.getVersion());
+
+ Assert.assertTrue(
+ replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2,
segmentV1Q3, segmentV1Q4)
+ .isSuccess()
+ );
+ verifyIntervalHasUsedSegments(YEAR_23, segmentV1Q1, segmentV1Q2,
segmentV1Q3, segmentV1Q4);
+ verifyIntervalHasVisibleSegments(YEAR_23, segmentV1Q1, segmentV1Q2,
segmentV1Q3, segmentV1Q4);
+
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(),
Granularities.YEAR);
+ Assert.assertEquals(JAN_FEB_MAR_23, pendingSegment.getInterval());
+ Assert.assertEquals(replaceLock.getVersion(), pendingSegment.getVersion());
+
+ final DataSegment appendedSegment = asSegment(pendingSegment);
+ appendTask.commitAppendSegments(appendedSegment);
+
+ verifyIntervalHasUsedSegments(YEAR_23, appendedSegment, segmentV1Q1,
segmentV1Q2, segmentV1Q3, segmentV1Q4);
+ verifyIntervalHasVisibleSegments(YEAR_23, appendedSegment, segmentV1Q1,
segmentV1Q2, segmentV1Q3, segmentV1Q4);
+ }
+
+ @Test
+ public void testLockAllocateAppendYearReplaceQuarter()
+ {
+ final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
+ Assert.assertNotNull(replaceLock);
+
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(),
Granularities.YEAR);
+ Assert.assertEquals(YEAR_23, pendingSegment.getInterval());
+ Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ appendTask.commitAppendSegments(segmentV01);
+
+ verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
+ verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
+
+ final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23,
replaceLock.getVersion());
+ final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23,
replaceLock.getVersion());
+ final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23,
replaceLock.getVersion());
+ final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23,
replaceLock.getVersion());
+
+ Assert.assertFalse(
+ replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2,
segmentV1Q3, segmentV1Q4)
+ .isSuccess()
+ );
+
+ verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
+ verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
+ }
+
+ @Test
+ public void testLockAllocateReplaceQuarterAppendYear()
+ {
+ final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
+ Assert.assertNotNull(replaceLock);
+
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(),
Granularities.YEAR);
+ Assert.assertEquals(YEAR_23, pendingSegment.getInterval());
+ Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+ final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23,
replaceLock.getVersion());
+ final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23,
replaceLock.getVersion());
+ final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23,
replaceLock.getVersion());
+ final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23,
replaceLock.getVersion());
+
+ Assert.assertFalse(
+ replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2,
segmentV1Q3, segmentV1Q4)
+ .isSuccess()
+ );
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ appendTask.commitAppendSegments(segmentV01);
+
+ verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
+ verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
+ }
+
+ @Test
+ public void testAllocateLockReplaceQuarterAppendYear()
+ {
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(),
Granularities.YEAR);
+ Assert.assertEquals(YEAR_23, pendingSegment.getInterval());
+ Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+ final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
+ Assert.assertNotNull(replaceLock);
+
+ final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23,
replaceLock.getVersion());
+ final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23,
replaceLock.getVersion());
+ final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23,
replaceLock.getVersion());
+ final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23,
replaceLock.getVersion());
+
+ Assert.assertFalse(
+ replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2,
segmentV1Q3, segmentV1Q4)
+ .isSuccess()
+ );
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ appendTask.commitAppendSegments(segmentV01);
+
+ verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
+ verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
+ }
+
+ @Test
+ public void testAllocateLockAppendYearReplaceQuarter()
+ {
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(),
Granularities.YEAR);
+ Assert.assertEquals(YEAR_23, pendingSegment.getInterval());
+ Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+ final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
+ Assert.assertNotNull(replaceLock);
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ appendTask.commitAppendSegments(segmentV01);
+
+ verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
+ verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
+
+ final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23,
replaceLock.getVersion());
+ final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23,
replaceLock.getVersion());
+ final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23,
replaceLock.getVersion());
+ final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23,
replaceLock.getVersion());
+
+ Assert.assertFalse(
+ replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2,
segmentV1Q3, segmentV1Q4)
+ .isSuccess()
+ );
+
+ verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
+ verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
+ }
+
+ @Test
+ public void testAllocateAppendLockYearReplaceQuarter()
+ {
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(),
Granularities.YEAR);
+ Assert.assertEquals(YEAR_23, pendingSegment.getInterval());
+ Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ appendTask.commitAppendSegments(segmentV01);
+
+ verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
+ verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
+
+ final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
+ Assert.assertNotNull(replaceLock);
+
+ final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23,
replaceLock.getVersion());
+ final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23,
replaceLock.getVersion());
+ final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23,
replaceLock.getVersion());
+ final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23,
replaceLock.getVersion());
+
+ Assert.assertTrue(
+ replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2,
segmentV1Q3, segmentV1Q4)
+ .isSuccess()
+ );
+
+ verifyIntervalHasUsedSegments(YEAR_23, segmentV01, segmentV1Q1,
segmentV1Q2, segmentV1Q3, segmentV1Q4);
+ verifyIntervalHasVisibleSegments(YEAR_23, segmentV1Q1, segmentV1Q2,
segmentV1Q3, segmentV1Q4);
+ }
+
@Test
public void testAllocateAppendMonthLockReplaceDay()
{
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 463232012ed..a512f793574 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -31,6 +31,7 @@ import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.inject.Inject;
+import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -900,7 +901,15 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
} else if (pendingSegment.getId().getVersion().compareTo(replaceVersion)
>= 0) {
return false;
} else if
(!replaceInterval.contains(pendingSegment.getId().getInterval())) {
- return false;
+ final SegmentId pendingSegmentId = pendingSegment.getId().asSegmentId();
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.UNSUPPORTED)
+ .build(
+ "Replacing with a finer segment granularity than
a concurrent append is unsupported."
+ + " Cannot upgrade pendingSegment[%s] to
version[%s] as the replace interval[%s]"
+ + " does not fully contain the pendingSegment
interval[%s].",
+ pendingSegmentId, replaceVersion,
replaceInterval, pendingSegmentId.getInterval()
+ );
} else {
// Do not upgrade already upgraded pending segment
return pendingSegment.getSequenceName() == null
@@ -2200,10 +2209,16 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
newInterval = replaceInterval;
break;
} else if (replaceInterval.overlaps(oldInterval)) {
- throw new ISE(
- "Incompatible segment intervals for commit: [%s] and [%s].",
- oldInterval, replaceInterval
- );
+ final String conflictingSegmentId = oldSegment.getId().toString();
+ final String upgradeVersion =
upgradeSegmentToLockVersion.get(conflictingSegmentId);
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.UNSUPPORTED)
+ .build(
+ "Replacing with a finer segment granularity
than a concurrent append is unsupported."
+ + " Cannot upgrade segment[%s] to
version[%s] as the replace interval[%s]"
+ + " does not fully contain the pending
segment interval[%s].",
+ conflictingSegmentId, upgradeVersion,
replaceInterval, oldInterval
+ );
}
}
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index f82cfbf2a04..4b592e5f40d 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -319,6 +319,73 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals(replaceLock.getVersion(),
Iterables.getOnlyElement(observedLockVersions));
}
+ @Test
+ public void
testCommitReplaceSegments_partiallyOverlappingPendingSegmentUnsupported()
+ {
+ final ReplaceTaskLock replaceLock = new ReplaceTaskLock("g1",
Intervals.of("2023-01-01/2023-02-01"), "2023-02-01");
+ final Set<DataSegment> segmentsAppendedWithReplaceLock = new HashSet<>();
+ final Map<DataSegment, ReplaceTaskLock> appendedSegmentToReplaceLockMap =
new HashMap<>();
+ final PendingSegmentRecord pendingSegmentForInterval = new
PendingSegmentRecord(
+ new SegmentIdWithShardSpec(
+ "foo",
+ Intervals.of("2023-01-01/2024-01-01"),
+ "2023-01-02",
+ new NumberedShardSpec(100, 0)
+ ),
+ "",
+ "",
+ null,
+ "append"
+ );
+ for (int i = 1; i < 9; i++) {
+ final DataSegment segment = new DataSegment(
+ "foo",
+ Intervals.of("2023-01-0" + i + "/2023-01-0" + (i + 1)),
+ "2023-01-0" + i,
+ ImmutableMap.of("path", "a-" + i),
+ ImmutableList.of("dim1"),
+ ImmutableList.of("m1"),
+ new LinearShardSpec(0),
+ 9,
+ 100
+ );
+ segmentsAppendedWithReplaceLock.add(segment);
+ appendedSegmentToReplaceLockMap.put(segment, replaceLock);
+ }
+
+ segmentSchemaTestUtils.insertUsedSegments(segmentsAppendedWithReplaceLock,
Collections.emptyMap());
+ derbyConnector.retryWithHandle(
+ handle -> coordinator.insertPendingSegmentsIntoMetastore(
+ handle,
+ ImmutableList.of(pendingSegmentForInterval),
+ "foo",
+ true
+ )
+ );
+ insertIntoUpgradeSegmentsTable(appendedSegmentToReplaceLockMap,
derbyConnectorRule.metadataTablesConfigSupplier().get());
+
+ final Set<DataSegment> replacingSegments = new HashSet<>();
+ for (int i = 1; i < 9; i++) {
+ final DataSegment segment = new DataSegment(
+ "foo",
+ Intervals.of("2023-01-01/2023-02-01"),
+ "2023-02-01",
+ ImmutableMap.of("path", "b-" + i),
+ ImmutableList.of("dim1"),
+ ImmutableList.of("m1"),
+ new NumberedShardSpec(i, 9),
+ 9,
+ 100
+ );
+ replacingSegments.add(segment);
+ }
+
+ Assert.assertFalse(
+ coordinator.commitReplaceSegments(replacingSegments,
ImmutableSet.of(replaceLock), null)
+ .isSuccess()
+ );
+ }
+
@Test
public void testCommitReplaceSegments()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]