This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e26d0076e55 Fix concurrent append to interval with unused segments
(#18230)
e26d0076e55 is described below
commit e26d0076e55859ad30c40886cf0d9716ea950ef6
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Jul 11 23:06:27 2025 +0530
Fix concurrent append to interval with unused segments (#18230)
This is a better approach to the fix in #18216
Changes:
- When allocating the first segment in an interval which already contains
an unused segment,
use a fresh version rather than reusing the old version (now unused)
---
.../concurrent/ConcurrentReplaceAndAppendTest.java | 59 +++++++-
.../org/apache/druid/error/ExceptionMatcher.java | 7 +
.../IndexerSQLMetadataStorageCoordinator.java | 101 +++++++++++--
.../druid/metadata/PendingSegmentRecord.java | 13 ++
.../druid/metadata/SqlSegmentsMetadataQuery.java | 23 +++
.../IndexerSQLMetadataStorageCoordinatorTest.java | 168 +++++++++++++++++++++
6 files changed, 357 insertions(+), 14 deletions(-)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index d052fd65489..be2d351f1c5 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -23,6 +23,8 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
+import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.error.ExceptionMatcher;
import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
@@ -63,6 +65,7 @@ import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.hamcrest.MatcherAssert;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
@@ -1120,7 +1123,7 @@ public class ConcurrentReplaceAndAppendTest extends
IngestionTestBase
}
@Test
- public void test_concurrentAppend_toIntervalWithUnusedSegments()
+ public void
test_concurrentAppend_toIntervalWithUnusedAppendSegment_createsFreshVersion()
{
// Allocate and commit an APPEND segment
final SegmentIdWithShardSpec pendingSegment
@@ -1141,8 +1144,10 @@ public class ConcurrentReplaceAndAppendTest extends
IngestionTestBase
// Allocate and commit another APPEND segment
final SegmentIdWithShardSpec pendingSegment2
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(),
Granularities.DAY);
- Assert.assertEquals(SEGMENT_V0, pendingSegment2.getVersion());
- Assert.assertEquals(1, pendingSegment2.getShardSpec().getPartitionNum());
+
+ // Verify that the new segment gets a different version
+ Assert.assertEquals(SEGMENT_V0 + "S", pendingSegment2.getVersion());
+ Assert.assertEquals(0, pendingSegment2.getShardSpec().getPartitionNum());
final DataSegment segmentV02 = asSegment(pendingSegment2);
appendTask.commitAppendSegments(segmentV02);
@@ -1152,6 +1157,54 @@ public class ConcurrentReplaceAndAppendTest extends
IngestionTestBase
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV02);
}
+ @Test
+ public void
test_allocateCommitDelete_createsFreshVersion_uptoMaxAllowedRetries()
+ {
+ final int maxAllowedAppends = 10;
+ final int expectedParitionNum = 0;
+ String expectedVersion = SEGMENT_V0;
+
+ // Allocate, commit, delete, repeat
+ for (int i = 0; i < maxAllowedAppends; ++i, expectedVersion += "S") {
+ // Allocate a segment and verify its version and partition number
+ final SegmentIdWithShardSpec pendingSegment
+ = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(),
Granularities.DAY);
+
+ Assert.assertEquals(expectedVersion, pendingSegment.getVersion());
+ Assert.assertEquals(expectedParitionNum,
pendingSegment.getShardSpec().getPartitionNum());
+
+ // Commit the segment and verify its version and partition number
+ final DataSegment segment = asSegment(pendingSegment);
+ appendTask.commitAppendSegments(segment);
+
+ Assert.assertEquals(expectedVersion, segment.getVersion());
+ Assert.assertEquals(expectedParitionNum,
segment.getShardSpec().getPartitionNum());
+
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segment);
+ verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segment);
+
+ // Mark the segment as unused
+
getStorageCoordinator().markAllSegmentsAsUnused(appendTask.getDataSource());
+ verifyIntervalHasUsedSegments(FIRST_OF_JAN_23);
+ }
+
+ // Verify that the next attempt fails
+ MatcherAssert.assertThat(
+ Assert.assertThrows(
+ ISE.class,
+ () ->
appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(),
Granularities.DAY)
+ ),
+ ExceptionMatcher.of(ISE.class).expectRootCause(
+ DruidExceptionMatcher.internalServerError().expectMessageIs(
+ "Could not allocate segment"
+ +
"[wiki_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_1970-01-01T00:00:00.000Z]"
+ + " as there are too many clashing unused versions(upto
[1970-01-01T00:00:00.000ZSSSSSSSSSS])"
+ + " in the interval. Kill the old unused versions to proceed."
+ )
+ )
+ );
+ }
+
@Nullable
private DataSegment findSegmentWith(String version, Map<String, Object>
loadSpec, Set<DataSegment> segments)
{
diff --git
a/processing/src/test/java/org/apache/druid/error/ExceptionMatcher.java
b/processing/src/test/java/org/apache/druid/error/ExceptionMatcher.java
index 31195093254..5bc16c47f27 100644
--- a/processing/src/test/java/org/apache/druid/error/ExceptionMatcher.java
+++ b/processing/src/test/java/org/apache/druid/error/ExceptionMatcher.java
@@ -19,6 +19,7 @@
package org.apache.druid.error;
+import com.google.common.base.Throwables;
import org.apache.druid.matchers.DruidMatchers;
import org.hamcrest.Description;
import org.hamcrest.DiagnosingMatcher;
@@ -75,6 +76,12 @@ public class ExceptionMatcher extends
DiagnosingMatcher<Throwable>
return this;
}
+ public ExceptionMatcher expectRootCause(Matcher<Throwable> causeMatcher)
+ {
+ matcherList.add(0, DruidMatchers.fn("rootCause", Throwables::getRootCause,
causeMatcher));
+ return this;
+ }
+
@Override
protected boolean matches(Object item, Description mismatchDescription)
{
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index dade1668649..a2c5956772f 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -33,6 +33,7 @@ import com.google.common.io.BaseEncoding;
import com.google.inject.Inject;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InternalServerError;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -1419,7 +1420,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
version,
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
);
- pendingSegmentId = getTrueAllocatedId(transaction, pendingSegmentId);
+ pendingSegmentId = getUniqueIdForPrimaryAllocation(transaction,
pendingSegmentId);
return PendingSegmentRecord.create(
pendingSegmentId,
request.getSequenceName(),
@@ -1457,7 +1458,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
)
);
return PendingSegmentRecord.create(
- getTrueAllocatedId(transaction, pendingSegmentId),
+ getUniqueIdForSecondaryAllocation(transaction, pendingSegmentId),
request.getSequenceName(),
request.getPreviousSegmentId(),
null,
@@ -1562,7 +1563,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
version,
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
);
- return getTrueAllocatedId(transaction, allocatedId);
+ return getUniqueIdForPrimaryAllocation(transaction, allocatedId);
} else if (!overallMaxId.getInterval().equals(interval)) {
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s],
existingVersion[%s]: conflicting segment[%s].",
@@ -1590,18 +1591,90 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
committedMaxId == null ? 0 :
committedMaxId.getShardSpec().getNumCorePartitions()
)
);
- return getTrueAllocatedId(transaction, allocatedId);
+ return getUniqueIdForSecondaryAllocation(transaction, allocatedId);
}
}
/**
- * Verifies that the allocated id doesn't already exist in the
druid_segments table.
- * If yes, try to get the max unallocated id considering the unused segments
for the datasource, version and interval
- * Otherwise, use the same id.
- * @param allocatedId The segment allcoted on the basis of used and pending
segments
- * @return a segment id that isn't already used by other unused segments
+ * Returns a unique {@link SegmentIdWithShardSpec} which does not clash with
+ * any existing unused segment. If an unused segment already exists that
matches
+ * the interval and version of the given {@code allocatedId}, a fresh version
+ * is created by suffixing one or more {@link
PendingSegmentRecord#CONCURRENT_APPEND_VERSION_SUFFIX}.
+ * Such a conflict can happen only if all the segments in this interval
created
+ * by a prior APPEND task were marked as unused.
+ * <p>
+ * This method should be called only when allocating the first segment in an
interval.
+ */
+ private SegmentIdWithShardSpec getUniqueIdForPrimaryAllocation(
+ SegmentMetadataTransaction transaction,
+ SegmentIdWithShardSpec allocatedId
+ )
+ {
+ // Get all the unused segment versions for this datasource and interval
+ final Set<String> unusedSegmentVersions =
transaction.noCacheSql().retrieveUnusedSegmentVersionsWithInterval(
+ allocatedId.getDataSource(),
+ allocatedId.getInterval()
+ );
+
+ final String allocatedVersion = allocatedId.getVersion();
+ if (!unusedSegmentVersions.contains(allocatedVersion)) {
+ // Nothing to do, this version is new
+ return allocatedId;
+ } else if
(!PendingSegmentRecord.DEFAULT_VERSION_FOR_CONCURRENT_APPEND.equals(allocatedVersion))
{
+ // Version clash should never happen for non-APPEND locks
+ throw DruidException.defensive(
+ "Cannot allocate segment[%s] as there are already some unused
segments"
+ + " for version[%s] in this interval.",
+ allocatedId, allocatedVersion
+ );
+ }
+
+ // Iterate until a new non-clashing version is found
+ boolean foundFreshVersion = false;
+ StringBuilder candidateVersion = new
StringBuilder(allocatedId.getVersion());
+ for (int i = 0; i < 10; ++i) {
+ if (unusedSegmentVersions.contains(candidateVersion.toString())) {
+
candidateVersion.append(PendingSegmentRecord.CONCURRENT_APPEND_VERSION_SUFFIX);
+ } else {
+ foundFreshVersion = true;
+ break;
+ }
+ }
+
+ if (foundFreshVersion) {
+ final SegmentIdWithShardSpec uniqueId = new SegmentIdWithShardSpec(
+ allocatedId.getDataSource(),
+ allocatedId.getInterval(),
+ candidateVersion.toString(),
+ allocatedId.getShardSpec()
+ );
+ log.info(
+ "Created new unique pending segment ID[%s] with version[%s] for
originally allocated ID[%s].",
+ uniqueId, candidateVersion.toString(), allocatedId
+ );
+
+ return uniqueId;
+ } else {
+ throw InternalServerError.exception(
+ "Could not allocate segment[%s] as there are too many clashing
unused"
+ + " versions(upto [%s]) in the interval. Kill the old unused
versions to proceed.",
+ allocatedId, candidateVersion.toString()
+ );
+ }
+ }
+
+ /**
+ * Returns a unique {@link SegmentIdWithShardSpec} which does not clash with
+ * any existing unused segment. If an unused segment already exists that
matches
+ * the interval, version and partition number of the given {@code
allocatedId},
+ * a higher partition number is used. Such a conflict can happen only if some
+ * segments of the underlying version have been marked as unused while others
+ * are still used.
+ * <p>
+ * This method should not be called when allocating the first segment in an
+ * interval.
*/
- private SegmentIdWithShardSpec getTrueAllocatedId(
+ private SegmentIdWithShardSpec getUniqueIdForSecondaryAllocation(
SegmentMetadataTransaction transaction,
SegmentIdWithShardSpec allocatedId
)
@@ -1631,7 +1704,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
allocatedId.getShardSpec().getPartitionNum(),
unusedMaxId.getPartitionNum() + 1
);
- return new SegmentIdWithShardSpec(
+ final SegmentIdWithShardSpec uniqueId = new SegmentIdWithShardSpec(
allocatedId.getDataSource(),
allocatedId.getInterval(),
allocatedId.getVersion(),
@@ -1640,6 +1713,12 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
allocatedId.getShardSpec().getNumCorePartitions()
)
);
+ log.info(
+ "Created new unique pending segment ID[%s] with partition number[%s]
for originally allocated ID[%s].",
+ uniqueId, maxPartitionNum, allocatedId
+ );
+
+ return uniqueId;
}
@Override
diff --git
a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
index 47de746d917..b76160ab9fb 100644
--- a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
+++ b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
@@ -52,6 +52,19 @@ import java.sql.ResultSet;
*/
public class PendingSegmentRecord
{
+ /**
+ * Default lock version used by concurrent APPEND tasks.
+ */
+ public static final String DEFAULT_VERSION_FOR_CONCURRENT_APPEND =
DateTimes.EPOCH.toString();
+
+ /**
+ * Suffix to use to construct fresh segment versions in the event of a clash.
+ * The chosen character {@code S} is just for visual ease so that two
versions
+ * are not easily confused for each other.
+ * {@code 1970-01-01T00:00:00.000Z_1} vs {@code 1970-01-01T00:00:00.000ZS_1}.
+ */
+ public static final String CONCURRENT_APPEND_VERSION_SUFFIX = "S";
+
private final SegmentIdWithShardSpec id;
private final String sequenceName;
private final String sequencePrevId;
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index 2779c16da17..0eba529c7b4 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -1128,6 +1128,29 @@ public class SqlSegmentsMetadataQuery
return
segments.stream().filter(Objects::nonNull).collect(Collectors.toList());
}
+ /**
+ * Retrieves the versions of unused segments which are perfectly aligned with
+ * the given interval.
+ */
+ public Set<String> retrieveUnusedSegmentVersionsWithInterval(String
dataSource, Interval interval)
+ {
+ final String sql = StringUtils.format(
+ "SELECT DISTINCT(version) FROM %1$s"
+ + " WHERE dataSource = :dataSource AND used = false"
+ + " AND %2$send%2$s = :end AND start = :start",
+ dbTables.getSegmentsTable(),
+ connector.getQuoteString()
+ );
+ return Set.copyOf(
+ handle.createQuery(sql)
+ .bind("dataSource", dataSource)
+ .bind("start", interval.getStart().toString())
+ .bind("end", interval.getEnd().toString())
+ .mapTo(String.class)
+ .list()
+ );
+ }
+
/**
* Retrieve the used segment for a given id if it exists in the metadata
store and null otherwise
*/
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 11dda468a5d..c9ee0e97fe7 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -23,7 +23,9 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.error.ExceptionMatcher;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.ObjectMetadata;
@@ -80,6 +82,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -3123,6 +3126,112 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1",
identifier.toString());
}
+ @Test
+ public void
test_concurrentAppend_toIntervalWithUnusedAppendSegment_createsFreshVersion()
+ {
+ final String wiki = TestDataSource.WIKI;
+ final String appendLockVersion =
PendingSegmentRecord.DEFAULT_VERSION_FOR_CONCURRENT_APPEND;
+ final Interval firstOfJan23 = Intervals.of("2023-01-01/P1D");
+
+ // Allocate and commit an APPEND segment
+ final String taskAllocator1 = "taskAlloc1";
+ final SegmentIdWithShardSpec pendingSegment
+ = allocatePendingSegmentForAppendTask(wiki, firstOfJan23,
taskAllocator1);
+
+ Assert.assertNotNull(pendingSegment);
+ Assert.assertEquals(appendLockVersion, pendingSegment.getVersion());
+ Assert.assertEquals(0, pendingSegment.getShardSpec().getPartitionNum());
+
+ final DataSegment segmentV01 = asSegment(pendingSegment);
+ coordinator.commitAppendSegments(Set.of(segmentV01), Map.of(),
taskAllocator1, null);
+
+ verifyIntervalHasUsedSegments(wiki, firstOfJan23, segmentV01);
+ verifyIntervalHasVisibleSegments(wiki, firstOfJan23, segmentV01);
+
+ // Mark the segment as unused with a future update time to avoid race
conditions
+ final DateTime markUnusedTime = DateTimes.nowUtc().plusHours(1);
+ transactionFactory.inReadWriteDatasourceTransaction(
+ wiki,
+ t -> t.markAllSegmentsAsUnused(markUnusedTime)
+ );
+ verifyIntervalHasUsedSegments(wiki, firstOfJan23);
+
+ // Allocate and commit another APPEND segment
+ final String taskAllocator2 = "taskAlloc2";
+ final SegmentIdWithShardSpec pendingSegment2
+ = allocatePendingSegmentForAppendTask(wiki, firstOfJan23,
taskAllocator2);
+
+ // Verify that the new segment gets a different version
+ Assert.assertNotNull(pendingSegment2);
+ Assert.assertEquals(appendLockVersion + "S", pendingSegment2.getVersion());
+ Assert.assertEquals(0, pendingSegment2.getShardSpec().getPartitionNum());
+
+ final DataSegment segmentV02 = asSegment(pendingSegment2);
+ coordinator.commitAppendSegments(Set.of(segmentV02), Map.of(),
taskAllocator2, null);
+ Assert.assertNotEquals(segmentV01, segmentV02);
+
+ verifyIntervalHasUsedSegments(wiki, firstOfJan23, segmentV02);
+ verifyIntervalHasVisibleSegments(wiki, firstOfJan23, segmentV02);
+ }
+
+ @Test
+ public void
test_allocateCommitDelete_createsFreshVersion_uptoMaxAllowedRetries()
+ {
+ final String wiki = TestDataSource.WIKI;
+ final Interval firstOfJan23 = Intervals.of("2023-01-01/P1D");
+
+ final int maxAllowedAppends = 10;
+ final int expectedParitionNum = 0;
+
+ String expectedVersion = DateTimes.EPOCH.toString();
+
+ // Allocate, commit, delete, repeat
+ for (int i = 0; i < maxAllowedAppends; ++i, expectedVersion += "S") {
+ // Allocate a segment and verify its version and partition number
+ final String taskAllocatorId = IdUtils.getRandomId();
+ final SegmentIdWithShardSpec pendingSegment
+ = allocatePendingSegmentForAppendTask(wiki, firstOfJan23,
taskAllocatorId);
+
+ Assert.assertNotNull(pendingSegment);
+ Assert.assertEquals(expectedVersion, pendingSegment.getVersion());
+ Assert.assertEquals(expectedParitionNum,
pendingSegment.getShardSpec().getPartitionNum());
+
+ // Commit the segment and verify its version and partition number
+ final DataSegment segment = asSegment(pendingSegment);
+ coordinator.commitAppendSegments(Set.of(segment), Map.of(),
taskAllocatorId, null);
+
+ Assert.assertEquals(expectedVersion, segment.getVersion());
+ Assert.assertEquals(expectedParitionNum,
segment.getShardSpec().getPartitionNum());
+
+ verifyIntervalHasUsedSegments(wiki, firstOfJan23, segment);
+ verifyIntervalHasVisibleSegments(wiki, firstOfJan23, segment);
+
+ // Mark the segment as unused with a future update time to avoid race
conditions
+ final DateTime markUnusedTime = DateTimes.nowUtc().plusHours(1);
+ transactionFactory.inReadWriteDatasourceTransaction(
+ wiki,
+ t -> t.markAllSegmentsAsUnused(markUnusedTime)
+ );
+ verifyIntervalHasUsedSegments(wiki, firstOfJan23);
+ }
+
+ // Verify that the next attempt fails
+ MatcherAssert.assertThat(
+ Assert.assertThrows(
+ CallbackFailedException.class,
+ () -> allocatePendingSegmentForAppendTask(wiki, firstOfJan23,
IdUtils.getRandomId())
+ ),
+ ExceptionMatcher.of(CallbackFailedException.class).expectRootCause(
+ DruidExceptionMatcher.internalServerError().expectMessageIs(
+ "Could not allocate segment"
+ +
"[wiki_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_1970-01-01T00:00:00.000Z]"
+ + " as there are too many clashing unused versions(upto
[1970-01-01T00:00:00.000ZSSSSSSSSSS])"
+ + " in the interval. Kill the old unused versions to proceed."
+ )
+ )
+ );
+ }
+
@Test
public void testDeletePendingSegment() throws InterruptedException
{
@@ -4231,6 +4340,26 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
);
}
+ private SegmentIdWithShardSpec allocatePendingSegmentForAppendTask(
+ String dataSource,
+ Interval interval,
+ String taskAllocatorId
+ )
+ {
+ return coordinator.allocatePendingSegment(
+ dataSource,
+ interval,
+ true,
+ new SegmentCreateRequest(
+ IdUtils.getRandomId(),
+ null,
+ PendingSegmentRecord.DEFAULT_VERSION_FOR_CONCURRENT_APPEND,
+ NumberedPartialShardSpec.instance(),
+ taskAllocatorId
+ )
+ );
+ }
+
private int insertPendingSegments(
String dataSource,
List<PendingSegmentRecord> pendingSegments,
@@ -4247,4 +4376,43 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
{
insertUsedSegments(segments, upgradedFromSegmentIdMap, derbyConnectorRule,
mapper);
}
+
+ private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment)
+ {
+ final SegmentId id = pendingSegment.asSegmentId();
+ return new DataSegment(
+ id,
+ Map.of(id.toString(), id.toString()),
+ List.of(),
+ List.of(),
+ pendingSegment.getShardSpec(),
+ null,
+ 0,
+ 0
+ );
+ }
+
+ private void verifyIntervalHasUsedSegments(
+ String dataSource,
+ Interval interval,
+ DataSegment... expectedSegments
+ )
+ {
+ Assert.assertEquals(
+ Set.of(expectedSegments),
+ coordinator.retrieveUsedSegmentsForIntervals(dataSource,
List.of(interval), Segments.INCLUDING_OVERSHADOWED)
+ );
+ }
+
+ private void verifyIntervalHasVisibleSegments(
+ String dataSource,
+ Interval interval,
+ DataSegment... expectedSegments
+ )
+ {
+ Assert.assertEquals(
+ Set.of(expectedSegments),
+ coordinator.retrieveUsedSegmentsForIntervals(dataSource,
List.of(interval), Segments.ONLY_VISIBLE)
+ );
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]