This is an automated email from the ASF dual-hosted git repository. capistrant pushed a commit to branch 34.0.0 in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/34.0.0 by this push: new ae252d9f015 Fix concurrent append to interval with unused segments (#18230) (#18275) ae252d9f015 is described below commit ae252d9f0154ab4546a8d61a4e977231f133021b Author: Lucas Capistrant <capistr...@users.noreply.github.com> AuthorDate: Thu Jul 17 12:23:43 2025 -0500 Fix concurrent append to interval with unused segments (#18230) (#18275) This is a better approach to the fix in #18216 Changes: - When allocating the first segment in an interval which already contains an unused segment, use a fresh version rather than reusing the old version (now unused) Co-authored-by: Kashif Faraz <kashif.fa...@gmail.com> --- .../concurrent/ConcurrentReplaceAndAppendTest.java | 59 +++++++- .../org/apache/druid/error/ExceptionMatcher.java | 7 + .../IndexerSQLMetadataStorageCoordinator.java | 101 +++++++++++-- .../druid/metadata/PendingSegmentRecord.java | 13 ++ .../druid/metadata/SqlSegmentsMetadataQuery.java | 23 +++ .../IndexerSQLMetadataStorageCoordinatorTest.java | 168 +++++++++++++++++++++ 6 files changed, 357 insertions(+), 14 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java index d052fd65489..be2d351f1c5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java @@ -23,6 +23,8 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import org.apache.druid.error.DruidExceptionMatcher; +import org.apache.druid.error.ExceptionMatcher; import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskStorageDirTracker; @@ -63,6 +65,7 @@ import org.apache.druid.tasklogs.NoopTaskLogs; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.hamcrest.MatcherAssert; import org.joda.time.Interval; import org.joda.time.Period; import org.junit.After; @@ -1120,7 +1123,7 @@ public class ConcurrentReplaceAndAppendTest extends IngestionTestBase } @Test - public void test_concurrentAppend_toIntervalWithUnusedSegments() + public void test_concurrentAppend_toIntervalWithUnusedAppendSegment_createsFreshVersion() { // Allocate and commit an APPEND segment final SegmentIdWithShardSpec pendingSegment @@ -1141,8 +1144,10 @@ public class ConcurrentReplaceAndAppendTest extends IngestionTestBase // Allocate and commit another APPEND segment final SegmentIdWithShardSpec pendingSegment2 = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY); - Assert.assertEquals(SEGMENT_V0, pendingSegment2.getVersion()); - Assert.assertEquals(1, pendingSegment2.getShardSpec().getPartitionNum()); + + // Verify that the new segment gets a different version + Assert.assertEquals(SEGMENT_V0 + "S", pendingSegment2.getVersion()); + Assert.assertEquals(0, pendingSegment2.getShardSpec().getPartitionNum()); final DataSegment segmentV02 = asSegment(pendingSegment2); appendTask.commitAppendSegments(segmentV02); @@ -1152,6 +1157,54 @@ public class ConcurrentReplaceAndAppendTest extends IngestionTestBase verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV02); } + @Test + public void test_allocateCommitDelete_createsFreshVersion_uptoMaxAllowedRetries() + { + final int maxAllowedAppends = 10; + final int expectedParitionNum = 0; + String expectedVersion = SEGMENT_V0; + + // Allocate, commit, delete, repeat + for (int i = 0; i < maxAllowedAppends; ++i, expectedVersion += "S") { + // Allocate a segment and verify its version and partition number + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY); + + Assert.assertEquals(expectedVersion, pendingSegment.getVersion()); + Assert.assertEquals(expectedParitionNum, pendingSegment.getShardSpec().getPartitionNum()); + + // Commit the segment and verify its version and partition number + final DataSegment segment = asSegment(pendingSegment); + appendTask.commitAppendSegments(segment); + + Assert.assertEquals(expectedVersion, segment.getVersion()); + Assert.assertEquals(expectedParitionNum, segment.getShardSpec().getPartitionNum()); + + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segment); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segment); + + // Mark the segment as unused + getStorageCoordinator().markAllSegmentsAsUnused(appendTask.getDataSource()); + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23); + } + + // Verify that the next attempt fails + MatcherAssert.assertThat( + Assert.assertThrows( + ISE.class, + () -> appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY) + ), + ExceptionMatcher.of(ISE.class).expectRootCause( + DruidExceptionMatcher.internalServerError().expectMessageIs( + "Could not allocate segment" + + "[wiki_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_1970-01-01T00:00:00.000Z]" + + " as there are too many clashing unused versions(upto [1970-01-01T00:00:00.000ZSSSSSSSSSS])" + + " in the interval. Kill the old unused versions to proceed." + ) + ) + ); + } + @Nullable private DataSegment findSegmentWith(String version, Map<String, Object> loadSpec, Set<DataSegment> segments) { diff --git a/processing/src/test/java/org/apache/druid/error/ExceptionMatcher.java b/processing/src/test/java/org/apache/druid/error/ExceptionMatcher.java index 31195093254..5bc16c47f27 100644 --- a/processing/src/test/java/org/apache/druid/error/ExceptionMatcher.java +++ b/processing/src/test/java/org/apache/druid/error/ExceptionMatcher.java @@ -19,6 +19,7 @@ package org.apache.druid.error; +import com.google.common.base.Throwables; import org.apache.druid.matchers.DruidMatchers; import org.hamcrest.Description; import org.hamcrest.DiagnosingMatcher; @@ -75,6 +76,12 @@ public class ExceptionMatcher extends DiagnosingMatcher<Throwable> return this; } + public ExceptionMatcher expectRootCause(Matcher<Throwable> causeMatcher) + { + matcherList.add(0, DruidMatchers.fn("rootCause", Throwables::getRootCause, causeMatcher)); + return this; + } + @Override protected boolean matches(Object item, Description mismatchDescription) { diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index dade1668649..a2c5956772f 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -33,6 +33,7 @@ import com.google.common.io.BaseEncoding; import com.google.inject.Inject; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.error.DruidException; +import org.apache.druid.error.InternalServerError; import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; @@ -1419,7 +1420,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor version, partialShardSpec.complete(jsonMapper, newPartitionId, 0) ); - pendingSegmentId = getTrueAllocatedId(transaction, pendingSegmentId); + pendingSegmentId = getUniqueIdForPrimaryAllocation(transaction, pendingSegmentId); return PendingSegmentRecord.create( pendingSegmentId, request.getSequenceName(), @@ -1457,7 +1458,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor ) ); return PendingSegmentRecord.create( - getTrueAllocatedId(transaction, pendingSegmentId), + getUniqueIdForSecondaryAllocation(transaction, pendingSegmentId), request.getSequenceName(), request.getPreviousSegmentId(), null, @@ -1562,7 +1563,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor version, partialShardSpec.complete(jsonMapper, newPartitionId, 0) ); - return getTrueAllocatedId(transaction, allocatedId); + return getUniqueIdForPrimaryAllocation(transaction, allocatedId); } else if (!overallMaxId.getInterval().equals(interval)) { log.warn( "Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].", @@ -1590,18 +1591,90 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions() ) ); - return getTrueAllocatedId(transaction, allocatedId); + return getUniqueIdForSecondaryAllocation(transaction, allocatedId); } } /** - * Verifies that the allocated id doesn't already exist in the druid_segments table. - * If yes, try to get the max unallocated id considering the unused segments for the datasource, version and interval - * Otherwise, use the same id. - * @param allocatedId The segment allcoted on the basis of used and pending segments - * @return a segment id that isn't already used by other unused segments + * Returns a unique {@link SegmentIdWithShardSpec} which does not clash with + * any existing unused segment. If an unused segment already exists that matches + * the interval and version of the given {@code allocatedId}, a fresh version + * is created by suffixing one or more {@link PendingSegmentRecord#CONCURRENT_APPEND_VERSION_SUFFIX}. + * Such a conflict can happen only if all the segments in this interval created + * by a prior APPEND task were marked as unused. + * <p> + * This method should be called only when allocating the first segment in an interval. + */ + private SegmentIdWithShardSpec getUniqueIdForPrimaryAllocation( + SegmentMetadataTransaction transaction, + SegmentIdWithShardSpec allocatedId + ) + { + // Get all the unused segment versions for this datasource and interval + final Set<String> unusedSegmentVersions = transaction.noCacheSql().retrieveUnusedSegmentVersionsWithInterval( + allocatedId.getDataSource(), + allocatedId.getInterval() + ); + + final String allocatedVersion = allocatedId.getVersion(); + if (!unusedSegmentVersions.contains(allocatedVersion)) { + // Nothing to do, this version is new + return allocatedId; + } else if (!PendingSegmentRecord.DEFAULT_VERSION_FOR_CONCURRENT_APPEND.equals(allocatedVersion)) { + // Version clash should never happen for non-APPEND locks + throw DruidException.defensive( + "Cannot allocate segment[%s] as there are already some unused segments" + + " for version[%s] in this interval.", + allocatedId, allocatedVersion + ); + } + + // Iterate until a new non-clashing version is found + boolean foundFreshVersion = false; + StringBuilder candidateVersion = new StringBuilder(allocatedId.getVersion()); + for (int i = 0; i < 10; ++i) { + if (unusedSegmentVersions.contains(candidateVersion.toString())) { + candidateVersion.append(PendingSegmentRecord.CONCURRENT_APPEND_VERSION_SUFFIX); + } else { + foundFreshVersion = true; + break; + } + } + + if (foundFreshVersion) { + final SegmentIdWithShardSpec uniqueId = new SegmentIdWithShardSpec( + allocatedId.getDataSource(), + allocatedId.getInterval(), + candidateVersion.toString(), + allocatedId.getShardSpec() + ); + log.info( + "Created new unique pending segment ID[%s] with version[%s] for originally allocated ID[%s].", + uniqueId, candidateVersion.toString(), allocatedId + ); + + return uniqueId; + } else { + throw InternalServerError.exception( + "Could not allocate segment[%s] as there are too many clashing unused" + + " versions(upto [%s]) in the interval. Kill the old unused versions to proceed.", + allocatedId, candidateVersion.toString() + ); + } + } + + /** + * Returns a unique {@link SegmentIdWithShardSpec} which does not clash with + * any existing unused segment. If an unused segment already exists that matches + * the interval, version and partition number of the given {@code allocatedId}, + * a higher partition number is used. Such a conflict can happen only if some + * segments of the underlying version have been marked as unused while others + * are still used. + * <p> + * This method should not be called when allocating the first segment in an + * interval. */ - private SegmentIdWithShardSpec getTrueAllocatedId( + private SegmentIdWithShardSpec getUniqueIdForSecondaryAllocation( SegmentMetadataTransaction transaction, SegmentIdWithShardSpec allocatedId ) @@ -1631,7 +1704,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor allocatedId.getShardSpec().getPartitionNum(), unusedMaxId.getPartitionNum() + 1 ); - return new SegmentIdWithShardSpec( + final SegmentIdWithShardSpec uniqueId = new SegmentIdWithShardSpec( allocatedId.getDataSource(), allocatedId.getInterval(), allocatedId.getVersion(), @@ -1640,6 +1713,12 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor allocatedId.getShardSpec().getNumCorePartitions() ) ); + log.info( + "Created new unique pending segment ID[%s] with partition number[%s] for originally allocated ID[%s].", + uniqueId, maxPartitionNum, allocatedId + ); + + return uniqueId; } @Override diff --git a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java index 47de746d917..b76160ab9fb 100644 --- a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java +++ b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java @@ -52,6 +52,19 @@ import java.sql.ResultSet; */ public class PendingSegmentRecord { + /** + * Default lock version used by concurrent APPEND tasks. + */ + public static final String DEFAULT_VERSION_FOR_CONCURRENT_APPEND = DateTimes.EPOCH.toString(); + + /** + * Suffix to use to construct fresh segment versions in the event of a clash. + * The chosen character {@code S} is just for visual ease so that two versions + * are not easily confused for each other. + * {@code 1970-01-01T00:00:00.000Z_1} vs {@code 1970-01-01T00:00:00.000ZS_1}. + */ + public static final String CONCURRENT_APPEND_VERSION_SUFFIX = "S"; + private final SegmentIdWithShardSpec id; private final String sequenceName; private final String sequencePrevId; diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index 2779c16da17..0eba529c7b4 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -1128,6 +1128,29 @@ public class SqlSegmentsMetadataQuery return segments.stream().filter(Objects::nonNull).collect(Collectors.toList()); } + /** + * Retrieves the versions of unused segments which are perfectly aligned with + * the given interval. + */ + public Set<String> retrieveUnusedSegmentVersionsWithInterval(String dataSource, Interval interval) + { + final String sql = StringUtils.format( + "SELECT DISTINCT(version) FROM %1$s" + + " WHERE dataSource = :dataSource AND used = false" + + " AND %2$send%2$s = :end AND start = :start", + dbTables.getSegmentsTable(), + connector.getQuoteString() + ); + return Set.copyOf( + handle.createQuery(sql) + .bind("dataSource", dataSource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .mapTo(String.class) + .list() + ); + } + /** * Retrieve the used segment for a given id if it exists in the metadata store and null otherwise */ diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 11dda468a5d..c9ee0e97fe7 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -23,7 +23,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.StringTuple; +import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.error.ExceptionMatcher; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.ObjectMetadata; @@ -80,6 +82,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.skife.jdbi.v2.exceptions.CallbackFailedException; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -3123,6 +3126,112 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", identifier.toString()); } + @Test + public void test_concurrentAppend_toIntervalWithUnusedAppendSegment_createsFreshVersion() + { + final String wiki = TestDataSource.WIKI; + final String appendLockVersion = PendingSegmentRecord.DEFAULT_VERSION_FOR_CONCURRENT_APPEND; + final Interval firstOfJan23 = Intervals.of("2023-01-01/P1D"); + + // Allocate and commit an APPEND segment + final String taskAllocator1 = "taskAlloc1"; + final SegmentIdWithShardSpec pendingSegment + = allocatePendingSegmentForAppendTask(wiki, firstOfJan23, taskAllocator1); + + Assert.assertNotNull(pendingSegment); + Assert.assertEquals(appendLockVersion, pendingSegment.getVersion()); + Assert.assertEquals(0, pendingSegment.getShardSpec().getPartitionNum()); + + final DataSegment segmentV01 = asSegment(pendingSegment); + coordinator.commitAppendSegments(Set.of(segmentV01), Map.of(), taskAllocator1, null); + + verifyIntervalHasUsedSegments(wiki, firstOfJan23, segmentV01); + verifyIntervalHasVisibleSegments(wiki, firstOfJan23, segmentV01); + + // Mark the segment as unused with a future update time to avoid race conditions + final DateTime markUnusedTime = DateTimes.nowUtc().plusHours(1); + transactionFactory.inReadWriteDatasourceTransaction( + wiki, + t -> t.markAllSegmentsAsUnused(markUnusedTime) + ); + verifyIntervalHasUsedSegments(wiki, firstOfJan23); + + // Allocate and commit another APPEND segment + final String taskAllocator2 = "taskAlloc2"; + final SegmentIdWithShardSpec pendingSegment2 + = allocatePendingSegmentForAppendTask(wiki, firstOfJan23, taskAllocator2); + + // Verify that the new segment gets a different version + Assert.assertNotNull(pendingSegment2); + Assert.assertEquals(appendLockVersion + "S", pendingSegment2.getVersion()); + Assert.assertEquals(0, pendingSegment2.getShardSpec().getPartitionNum()); + + final DataSegment segmentV02 = asSegment(pendingSegment2); + coordinator.commitAppendSegments(Set.of(segmentV02), Map.of(), taskAllocator2, null); + Assert.assertNotEquals(segmentV01, segmentV02); + + verifyIntervalHasUsedSegments(wiki, firstOfJan23, segmentV02); + verifyIntervalHasVisibleSegments(wiki, firstOfJan23, segmentV02); + } + + @Test + public void test_allocateCommitDelete_createsFreshVersion_uptoMaxAllowedRetries() + { + final String wiki = TestDataSource.WIKI; + final Interval firstOfJan23 = Intervals.of("2023-01-01/P1D"); + + final int maxAllowedAppends = 10; + final int expectedParitionNum = 0; + + String expectedVersion = DateTimes.EPOCH.toString(); + + // Allocate, commit, delete, repeat + for (int i = 0; i < maxAllowedAppends; ++i, expectedVersion += "S") { + // Allocate a segment and verify its version and partition number + final String taskAllocatorId = IdUtils.getRandomId(); + final SegmentIdWithShardSpec pendingSegment + = allocatePendingSegmentForAppendTask(wiki, firstOfJan23, taskAllocatorId); + + Assert.assertNotNull(pendingSegment); + Assert.assertEquals(expectedVersion, pendingSegment.getVersion()); + Assert.assertEquals(expectedParitionNum, pendingSegment.getShardSpec().getPartitionNum()); + + // Commit the segment and verify its version and partition number + final DataSegment segment = asSegment(pendingSegment); + coordinator.commitAppendSegments(Set.of(segment), Map.of(), taskAllocatorId, null); + + Assert.assertEquals(expectedVersion, segment.getVersion()); + Assert.assertEquals(expectedParitionNum, segment.getShardSpec().getPartitionNum()); + + verifyIntervalHasUsedSegments(wiki, firstOfJan23, segment); + verifyIntervalHasVisibleSegments(wiki, firstOfJan23, segment); + + // Mark the segment as unused with a future update time to avoid race conditions + final DateTime markUnusedTime = DateTimes.nowUtc().plusHours(1); + transactionFactory.inReadWriteDatasourceTransaction( + wiki, + t -> t.markAllSegmentsAsUnused(markUnusedTime) + ); + verifyIntervalHasUsedSegments(wiki, firstOfJan23); + } + + // Verify that the next attempt fails + MatcherAssert.assertThat( + Assert.assertThrows( + CallbackFailedException.class, + () -> allocatePendingSegmentForAppendTask(wiki, firstOfJan23, IdUtils.getRandomId()) + ), + ExceptionMatcher.of(CallbackFailedException.class).expectRootCause( + DruidExceptionMatcher.internalServerError().expectMessageIs( + "Could not allocate segment" + + "[wiki_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_1970-01-01T00:00:00.000Z]" + + " as there are too many clashing unused versions(upto [1970-01-01T00:00:00.000ZSSSSSSSSSS])" + + " in the interval. Kill the old unused versions to proceed." + ) + ) + ); + } + @Test public void testDeletePendingSegment() throws InterruptedException { @@ -4231,6 +4340,26 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata ); } + private SegmentIdWithShardSpec allocatePendingSegmentForAppendTask( + String dataSource, + Interval interval, + String taskAllocatorId + ) + { + return coordinator.allocatePendingSegment( + dataSource, + interval, + true, + new SegmentCreateRequest( + IdUtils.getRandomId(), + null, + PendingSegmentRecord.DEFAULT_VERSION_FOR_CONCURRENT_APPEND, + NumberedPartialShardSpec.instance(), + taskAllocatorId + ) + ); + } + private int insertPendingSegments( String dataSource, List<PendingSegmentRecord> pendingSegments, @@ -4247,4 +4376,43 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata { insertUsedSegments(segments, upgradedFromSegmentIdMap, derbyConnectorRule, mapper); } + + private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment) + { + final SegmentId id = pendingSegment.asSegmentId(); + return new DataSegment( + id, + Map.of(id.toString(), id.toString()), + List.of(), + List.of(), + pendingSegment.getShardSpec(), + null, + 0, + 0 + ); + } + + private void verifyIntervalHasUsedSegments( + String dataSource, + Interval interval, + DataSegment... expectedSegments + ) + { + Assert.assertEquals( + Set.of(expectedSegments), + coordinator.retrieveUsedSegmentsForIntervals(dataSource, List.of(interval), Segments.INCLUDING_OVERSHADOWED) + ); + } + + private void verifyIntervalHasVisibleSegments( + String dataSource, + Interval interval, + DataSegment... expectedSegments + ) + { + Assert.assertEquals( + Set.of(expectedSegments), + coordinator.retrieveUsedSegmentsForIntervals(dataSource, List.of(interval), Segments.ONLY_VISIBLE) + ); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org