This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new feeb4f0fb03 Allocate pending segments at latest committed version
(#15459)
feeb4f0fb03 is described below
commit feeb4f0fb03fce90e523c7e1c10e71a19478400c
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Dec 14 16:18:39 2023 +0530
Allocate pending segments at latest committed version (#15459)
The segment allocation algorithm reuses an already allocated pending
segment if the new allocation request is made for the same parameters:
datasource
sequence name
same interval
same value of skipSegmentLineageCheck (false for batch append, true for
streaming append)
same previous segment id (used only when skipSegmentLineageCheck = false)
The above parameters can thus uniquely identify a pending segment (enforced
by the UNIQUE constraint on the sequence_name_prev_id_sha1 column in
druid_pendingSegments metadata table).
This reuse is done in order to
allow replica tasks (in case of streaming ingestion) to use the same set of
segment IDs.
allow re-run of a failed batch task to use the same segment ID and prevent
unnecessary allocations
---
.../common/actions/SegmentAllocateActionTest.java | 71 ++-
.../IndexerSQLMetadataStorageCoordinator.java | 522 ++++++++++++---------
.../IndexerSQLMetadataStorageCoordinatorTest.java | 2 -
3 files changed, 365 insertions(+), 230 deletions(-)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
index 13c499e47e2..4ccb8707750 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
@@ -55,11 +55,11 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -70,9 +70,6 @@ import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public class SegmentAllocateActionTest
{
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
@Rule
public TaskActionTestKit taskActionTestKit = new TaskActionTestKit();
@@ -403,6 +400,72 @@ public class SegmentAllocateActionTest
assertSameIdentifier(id2, id7);
}
+ @Test
+ public void testSegmentIsAllocatedForLatestUsedSegmentVersion() throws
IOException
+ {
+ final Task task = NoopTask.create();
+ taskActionTestKit.getTaskLockbox().add(task);
+
+ final String sequenceName = "sequence_1";
+
+ // Allocate segments when there are no committed segments
+ final SegmentIdWithShardSpec pendingSegmentV01 =
+ allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR,
sequenceName, null);
+ final SegmentIdWithShardSpec pendingSegmentV02 =
+ allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR,
sequenceName, null);
+
+ assertSameIdentifier(pendingSegmentV01, pendingSegmentV02);
+
+ // Commit a segment for version V1
+ final DataSegment segmentV1
+ = DataSegment.builder()
+ .dataSource(DATA_SOURCE)
+ .interval(Granularities.HOUR.bucket(PARTY_TIME))
+ .version(PARTY_TIME.plusDays(1).toString())
+ .shardSpec(new LinearShardSpec(0))
+ .size(100)
+ .build();
+ taskActionTestKit.getMetadataStorageCoordinator().commitSegments(
+ Collections.singleton(segmentV1)
+ );
+
+ // Verify that new allocations use version V1
+ final SegmentIdWithShardSpec pendingSegmentV11 =
+ allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR,
sequenceName, null);
+ final SegmentIdWithShardSpec pendingSegmentV12 =
+ allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR,
sequenceName, null);
+
+ assertSameIdentifier(pendingSegmentV11, pendingSegmentV12);
+ Assert.assertEquals(segmentV1.getVersion(),
pendingSegmentV11.getVersion());
+
+ Assert.assertNotEquals(pendingSegmentV01, pendingSegmentV11);
+
+ // Commit a segment for version V2 to overshadow V1
+ final DataSegment segmentV2
+ = DataSegment.builder()
+ .dataSource(DATA_SOURCE)
+ .interval(Granularities.HOUR.bucket(PARTY_TIME))
+ .version(PARTY_TIME.plusDays(2).toString())
+ .shardSpec(new LinearShardSpec(0))
+ .size(100)
+ .build();
+ taskActionTestKit.getMetadataStorageCoordinator().commitSegments(
+ Collections.singleton(segmentV2)
+ );
+ Assert.assertTrue(segmentV2.getVersion().compareTo(segmentV1.getVersion())
> 0);
+
+ // Verify that new segment allocations use version V2
+ final SegmentIdWithShardSpec pendingSegmentV21 =
+ allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR,
sequenceName, null);
+ final SegmentIdWithShardSpec pendingSegmentV22 =
+ allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR,
sequenceName, null);
+ assertSameIdentifier(pendingSegmentV21, pendingSegmentV22);
+ Assert.assertEquals(segmentV2.getVersion(),
pendingSegmentV21.getVersion());
+
+ Assert.assertNotEquals(pendingSegmentV21, pendingSegmentV01);
+ Assert.assertNotEquals(pendingSegmentV21, pendingSegmentV11);
+ }
+
@Test
public void testMultipleSequences()
{
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index c62e59c0b25..9e4fc578eda 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -645,10 +645,23 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
Preconditions.checkNotNull(sequenceName, "sequenceName");
Preconditions.checkNotNull(interval, "interval");
Preconditions.checkNotNull(maxVersion, "version");
- Interval allocateInterval =
interval.withChronology(ISOChronology.getInstanceUTC());
+ final Interval allocateInterval =
interval.withChronology(ISOChronology.getInstanceUTC());
return connector.retryWithHandle(
handle -> {
+ // Get the time chunk and associated data segments for the given
interval, if any
+ final List<TimelineObjectHolder<String, DataSegment>> existingChunks
=
+ getTimelineForIntervalsWithHandle(handle, dataSource,
ImmutableList.of(interval))
+ .lookup(interval);
+ if (existingChunks.size() > 1) {
+ // Not possible to expand more than one chunk with a single
segment.
+ log.warn(
+ "Cannot allocate new segment for dataSource[%s], interval[%s]
as it already has [%,d] versions.",
+ dataSource, interval, existingChunks.size()
+ );
+ return null;
+ }
+
if (skipSegmentLineageCheck) {
return allocatePendingSegment(
handle,
@@ -656,7 +669,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
sequenceName,
allocateInterval,
partialShardSpec,
- maxVersion
+ maxVersion,
+ existingChunks
);
} else {
return allocatePendingSegmentWithSegmentLineageCheck(
@@ -666,7 +680,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
previousSegmentId,
allocateInterval,
partialShardSpec,
- maxVersion
+ maxVersion,
+ existingChunks
);
}
}
@@ -803,26 +818,32 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
@Nullable final String previousSegmentId,
final Interval interval,
final PartialShardSpec partialShardSpec,
- final String maxVersion
+ final String maxVersion,
+ final List<TimelineObjectHolder<String, DataSegment>> existingChunks
) throws IOException
{
final String previousSegmentIdNotNull = previousSegmentId == null ? "" :
previousSegmentId;
- final CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId(
- handle.createQuery(
- StringUtils.format(
- "SELECT payload FROM %s WHERE "
- + "dataSource = :dataSource AND "
- + "sequence_name = :sequence_name AND "
- + "sequence_prev_id = :sequence_prev_id",
- dbTables.getPendingSegmentsTable()
- )
- ),
+
+ final String sql = StringUtils.format(
+ "SELECT payload FROM %s WHERE "
+ + "dataSource = :dataSource AND "
+ + "sequence_name = :sequence_name AND "
+ + "sequence_prev_id = :sequence_prev_id",
+ dbTables.getPendingSegmentsTable()
+ );
+ final Query<Map<String, Object>> query
+ = handle.createQuery(sql)
+ .bind("dataSource", dataSource)
+ .bind("sequence_name", sequenceName)
+ .bind("sequence_prev_id", previousSegmentIdNotNull);
+
+ final String usedSegmentVersion = existingChunks.isEmpty() ? null :
existingChunks.get(0).getVersion();
+ final CheckExistingSegmentIdResult result = findExistingPendingSegment(
+ query,
interval,
sequenceName,
previousSegmentIdNotNull,
- Pair.of("dataSource", dataSource),
- Pair.of("sequence_name", sequenceName),
- Pair.of("sequence_prev_id", previousSegmentIdNotNull)
+ usedSegmentVersion
);
if (result.found) {
@@ -835,7 +856,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
dataSource,
interval,
partialShardSpec,
- maxVersion
+ maxVersion,
+ existingChunks
);
if (newIdentifier == null) {
return null;
@@ -854,6 +876,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
.putBytes(StringUtils.toUtf8(sequenceName))
.putByte((byte) 0xff)
.putBytes(StringUtils.toUtf8(previousSegmentIdNotNull))
+ .putByte((byte) 0xff)
+ .putBytes(StringUtils.toUtf8(newIdentifier.getVersion()))
.hash()
.asBytes()
);
@@ -878,11 +902,26 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
final List<SegmentCreateRequest> requests
) throws IOException
{
+ // Get the time chunk and associated data segments for the given interval,
if any
+ final List<TimelineObjectHolder<String, DataSegment>> existingChunks =
+ getTimelineForIntervalsWithHandle(handle, dataSource,
Collections.singletonList(interval))
+ .lookup(interval);
+ if (existingChunks.size() > 1) {
+ log.warn(
+ "Cannot allocate new segments for dataSource[%s], interval[%s] as
interval already has [%,d] chunks.",
+ dataSource, interval, existingChunks.size()
+ );
+ return Collections.emptyMap();
+ }
+
+ final String existingVersion = existingChunks.isEmpty() ? null :
existingChunks.get(0).getVersion();
final Map<SegmentCreateRequest, CheckExistingSegmentIdResult>
existingSegmentIds;
if (skipSegmentLineageCheck) {
- existingSegmentIds = getExistingSegmentIdsSkipLineageCheck(handle,
dataSource, interval, requests);
+ existingSegmentIds =
+ getExistingSegmentIdsSkipLineageCheck(handle, dataSource, interval,
existingVersion, requests);
} else {
- existingSegmentIds = getExistingSegmentIdsWithLineageCheck(handle,
dataSource, interval, requests);
+ existingSegmentIds =
+ getExistingSegmentIdsWithLineageCheck(handle, dataSource, interval,
existingVersion, requests);
}
// For every request see if a segment id already exists
@@ -901,8 +940,14 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
// For each of the remaining requests, create a new segment
- final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments =
- createNewSegments(handle, dataSource, interval,
skipSegmentLineageCheck, requestsForNewSegments);
+ final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments =
createNewSegments(
+ handle,
+ dataSource,
+ interval,
+ skipSegmentLineageCheck,
+ existingChunks,
+ requestsForNewSegments
+ );
// SELECT -> INSERT can fail due to races; callers must be prepared to
retry.
// Avoiding ON DUPLICATE KEY since it's not portable.
@@ -925,14 +970,16 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
@SuppressWarnings("UnstableApiUsage")
private String getSequenceNameAndPrevIdSha(
SegmentCreateRequest request,
- Interval interval,
+ SegmentIdWithShardSpec pendingSegmentId,
boolean skipSegmentLineageCheck
)
{
final Hasher hasher = Hashing.sha1().newHasher()
.putBytes(StringUtils.toUtf8(request.getSequenceName()))
.putByte((byte) 0xff);
+
if (skipSegmentLineageCheck) {
+ final Interval interval = pendingSegmentId.getInterval();
hasher
.putLong(interval.getStartMillis())
.putLong(interval.getEndMillis());
@@ -941,6 +988,9 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
.putBytes(StringUtils.toUtf8(request.getPreviousSegmentId()));
}
+ hasher.putByte((byte) 0xff);
+ hasher.putBytes(StringUtils.toUtf8(pendingSegmentId.getVersion()));
+
return BaseEncoding.base16().encode(hasher.hash().asBytes());
}
@@ -951,28 +1001,32 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
final String sequenceName,
final Interval interval,
final PartialShardSpec partialShardSpec,
- final String maxVersion
+ final String maxVersion,
+ final List<TimelineObjectHolder<String, DataSegment>> existingChunks
) throws IOException
{
- final CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId(
- handle.createQuery(
- StringUtils.format(
- "SELECT payload FROM %s WHERE "
- + "dataSource = :dataSource AND "
- + "sequence_name = :sequence_name AND "
- + "start = :start AND "
- + "%2$send%2$s = :end",
- dbTables.getPendingSegmentsTable(),
- connector.getQuoteString()
- )
- ),
+ final String sql = StringUtils.format(
+ "SELECT payload FROM %s WHERE "
+ + "dataSource = :dataSource AND "
+ + "sequence_name = :sequence_name AND "
+ + "start = :start AND "
+ + "%2$send%2$s = :end",
+ dbTables.getPendingSegmentsTable(),
+ connector.getQuoteString()
+ );
+ final Query<Map<String, Object>> query
+ = handle.createQuery(sql)
+ .bind("dataSource", dataSource)
+ .bind("sequence_name", sequenceName)
+ .bind("start", interval.getStart().toString())
+ .bind("end", interval.getEnd().toString());
+
+ final CheckExistingSegmentIdResult result = findExistingPendingSegment(
+ query,
interval,
sequenceName,
null,
- Pair.of("dataSource", dataSource),
- Pair.of("sequence_name", sequenceName),
- Pair.of("start", interval.getStart().toString()),
- Pair.of("end", interval.getEnd().toString())
+ existingChunks.isEmpty() ? null : existingChunks.get(0).getVersion()
);
if (result.found) {
@@ -984,7 +1038,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
dataSource,
interval,
partialShardSpec,
- maxVersion
+ maxVersion,
+ existingChunks
);
if (newIdentifier == null) {
return null;
@@ -1004,6 +1059,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
.putByte((byte) 0xff)
.putLong(interval.getStartMillis())
.putLong(interval.getEndMillis())
+ .putByte((byte) 0xff)
+ .putBytes(StringUtils.toUtf8(newIdentifier.getVersion()))
.hash()
.asBytes()
);
@@ -1011,7 +1068,10 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
// always insert empty previous sequence id
insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource,
interval, "", sequenceName, sequenceNamePrevIdSha1);
- log.info("Allocated pending segment [%s] for sequence[%s] in DB",
newIdentifier, sequenceName);
+ log.info(
+ "Created new pending segment[%s] for datasource[%s], sequence[%s],
interval[%s].",
+ newIdentifier, dataSource, sequenceName, interval
+ );
return newIdentifier;
}
@@ -1023,6 +1083,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
Handle handle,
String dataSource,
Interval interval,
+ String usedSegmentVersion,
List<SegmentCreateRequest> requests
) throws IOException
{
@@ -1052,7 +1113,11 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
final PendingSegmentsRecord record = dbSegments.next();
final SegmentIdWithShardSpec segmentId =
jsonMapper.readValue(record.getPayload(),
SegmentIdWithShardSpec.class);
- sequenceToSegmentId.put(record.getSequenceName(), segmentId);
+
+ // Consider only the pending segments allocated for the latest used
segment version
+ if (usedSegmentVersion == null ||
segmentId.getVersion().equals(usedSegmentVersion)) {
+ sequenceToSegmentId.put(record.getSequenceName(), segmentId);
+ }
}
final Map<SegmentCreateRequest, CheckExistingSegmentIdResult>
requestToResult = new HashMap<>();
@@ -1071,6 +1136,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
Handle handle,
String dataSource,
Interval interval,
+ String usedSegmentVersion,
List<SegmentCreateRequest> requests
) throws IOException
{
@@ -1090,14 +1156,15 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
final Map<SegmentCreateRequest, CheckExistingSegmentIdResult>
requestToResult = new HashMap<>();
for (SegmentCreateRequest request : requests) {
- CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId(
+ CheckExistingSegmentIdResult result = findExistingPendingSegment(
handle.createQuery(sql)
.bind("dataSource", dataSource)
.bind("sequence_name", request.getSequenceName())
.bind("sequence_prev_id", request.getPreviousSegmentId()),
interval,
request.getSequenceName(),
- request.getPreviousSegmentId()
+ request.getPreviousSegmentId(),
+ usedSegmentVersion
);
requestToResult.put(request, result);
}
@@ -1105,50 +1172,43 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
return requestToResult;
}
- private CheckExistingSegmentIdResult checkAndGetExistingSegmentId(
+ private CheckExistingSegmentIdResult findExistingPendingSegment(
final Query<Map<String, Object>> query,
final Interval interval,
final String sequenceName,
final @Nullable String previousSegmentId,
- final Pair<String, String>... queryVars
+ final @Nullable String usedSegmentVersion
) throws IOException
{
- Query<Map<String, Object>> boundQuery = query;
- for (Pair<String, String> var : queryVars) {
- boundQuery = boundQuery.bind(var.lhs, var.rhs);
- }
- final List<byte[]> existingBytes =
boundQuery.map(ByteArrayMapper.FIRST).list();
-
- if (existingBytes.isEmpty()) {
+ final List<byte[]> records = query.map(ByteArrayMapper.FIRST).list();
+ if (records.isEmpty()) {
return new CheckExistingSegmentIdResult(false, null);
- } else {
- final SegmentIdWithShardSpec existingIdentifier = jsonMapper.readValue(
- Iterables.getOnlyElement(existingBytes),
- SegmentIdWithShardSpec.class
- );
-
- if (existingIdentifier.getInterval().isEqual(interval)) {
- log.info(
- "Found existing pending segment [%s] for sequence[%s] (previous =
[%s]) in DB",
- existingIdentifier,
- sequenceName,
- previousSegmentId
- );
+ }
- return new CheckExistingSegmentIdResult(true, existingIdentifier);
- } else {
- log.warn(
- "Cannot use existing pending segment [%s] for sequence[%s]
(previous = [%s]) in DB, "
- + "does not match requested interval[%s]",
- existingIdentifier,
- sequenceName,
- previousSegmentId,
- interval
- );
+ for (byte[] record : records) {
+ final SegmentIdWithShardSpec pendingSegment
+ = jsonMapper.readValue(record, SegmentIdWithShardSpec.class);
- return new CheckExistingSegmentIdResult(true, null);
+ // Consider only pending segments matching the expected version
+ if (usedSegmentVersion == null ||
pendingSegment.getVersion().equals(usedSegmentVersion)) {
+ if (pendingSegment.getInterval().isEqual(interval)) {
+ log.info(
+ "Found existing pending segment[%s] for sequence[%s], previous
segment[%s], version[%s] in DB",
+ pendingSegment, sequenceName, previousSegmentId,
usedSegmentVersion
+ );
+ return new CheckExistingSegmentIdResult(true, pendingSegment);
+ } else {
+ log.warn(
+ "Cannot use existing pending segment [%s] for sequence[%s],
previous segment[%s] in DB"
+ + " as it does not match requested interval[%s], version[%s].",
+ pendingSegment, sequenceName, previousSegmentId, interval,
usedSegmentVersion
+ );
+ return new CheckExistingSegmentIdResult(true, null);
+ }
}
}
+
+ return new CheckExistingSegmentIdResult(false, null);
}
private static class CheckExistingSegmentIdResult
@@ -1164,6 +1224,52 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
}
+ private static class UniqueAllocateRequest
+ {
+ private final Interval interval;
+ private final String previousSegmentId;
+ private final String sequenceName;
+ private final boolean skipSegmentLineageCheck;
+
+ private final int hashCode;
+
+ public UniqueAllocateRequest(
+ Interval interval,
+ SegmentCreateRequest request,
+ boolean skipSegmentLineageCheck
+ )
+ {
+ this.interval = interval;
+ this.sequenceName = request.getSequenceName();
+ this.previousSegmentId = request.getPreviousSegmentId();
+ this.skipSegmentLineageCheck = skipSegmentLineageCheck;
+
+ this.hashCode = Objects.hash(interval, sequenceName, previousSegmentId,
skipSegmentLineageCheck);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ UniqueAllocateRequest that = (UniqueAllocateRequest) o;
+ return skipSegmentLineageCheck == that.skipSegmentLineageCheck
+ && Objects.equals(interval, that.interval)
+ && Objects.equals(sequenceName, that.sequenceName)
+ && Objects.equals(previousSegmentId, that.previousSegmentId);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return hashCode;
+ }
+ }
+
private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction(
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
@@ -1264,7 +1370,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
.bind("sequence_prev_id", request.getPreviousSegmentId())
.bind(
"sequence_name_prev_id_sha1",
- getSequenceNameAndPrevIdSha(request, interval,
skipSegmentLineageCheck)
+ getSequenceNameAndPrevIdSha(request, segmentId,
skipSegmentLineageCheck)
)
.bind("payload", jsonMapper.writeValueAsBytes(segmentId));
}
@@ -1480,6 +1586,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
+ List<TimelineObjectHolder<String, DataSegment>> existingChunks,
List<SegmentCreateRequest> requests
) throws IOException
{
@@ -1487,22 +1594,6 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
return Collections.emptyMap();
}
- // Get the time chunk and associated data segments for the given interval,
if any
- final List<TimelineObjectHolder<String, DataSegment>> existingChunks =
- getTimelineForIntervalsWithHandle(handle, dataSource,
Collections.singletonList(interval))
- .lookup(interval);
-
- if (existingChunks.size() > 1) {
- // Not possible to expand more than one chunk with a single segment.
- log.warn(
- "Cannot allocate new segments for dataSource[%s], interval[%s]:
already have [%,d] chunks.",
- dataSource,
- interval,
- existingChunks.size()
- );
- return Collections.emptyMap();
- }
-
// Shard spec of any of the requests (as they are all compatible) can be
used to
// identify existing shard specs that share partition space with the
requested ones.
final PartialShardSpec partialShardSpec =
requests.get(0).getPartialShardSpec();
@@ -1542,15 +1633,16 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
new HashSet<>(getPendingSegmentsForIntervalWithHandle(handle,
dataSource, interval).keySet());
final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments =
new HashMap<>();
- final Map<String, SegmentIdWithShardSpec> sequenceHashToSegment = new
HashMap<>();
+ final Map<UniqueAllocateRequest, SegmentIdWithShardSpec>
uniqueRequestToSegment = new HashMap<>();
for (SegmentCreateRequest request : requests) {
// Check if the required segment has already been created in this batch
- final String sequenceHash = getSequenceNameAndPrevIdSha(request,
interval, skipSegmentLineageCheck);
+ final UniqueAllocateRequest uniqueRequest =
+ new UniqueAllocateRequest(interval, request,
skipSegmentLineageCheck);
final SegmentIdWithShardSpec createdSegment;
- if (sequenceHashToSegment.containsKey(sequenceHash)) {
- createdSegment = sequenceHashToSegment.get(sequenceHash);
+ if (uniqueRequestToSegment.containsKey(uniqueRequest)) {
+ createdSegment = uniqueRequestToSegment.get(uniqueRequest);
} else {
createdSegment = createNewSegment(
request,
@@ -1564,8 +1656,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
// Add to pendingSegments to consider for partitionId
if (createdSegment != null) {
pendingSegments.add(createdSegment);
- sequenceHashToSegment.put(sequenceHash, createdSegment);
- log.info("Created new segment [%s]", createdSegment);
+ uniqueRequestToSegment.put(uniqueRequest, createdSegment);
+ log.info("Created new segment[%s]", createdSegment);
}
}
@@ -1574,7 +1666,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
}
- log.info("Created [%d] new segments for [%d] allocate requests.",
sequenceHashToSegment.size(), requests.size());
+ log.info("Created [%d] new segments for [%d] allocate requests.",
uniqueRequestToSegment.size(), requests.size());
return createdSegments;
}
@@ -1694,140 +1786,122 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
final String dataSource,
final Interval interval,
final PartialShardSpec partialShardSpec,
- final String existingVersion
+ final String existingVersion,
+ final List<TimelineObjectHolder<String, DataSegment>> existingChunks
) throws IOException
{
- // Get the time chunk and associated data segments for the given interval,
if any
- final List<TimelineObjectHolder<String, DataSegment>> existingChunks =
getTimelineForIntervalsWithHandle(
- handle,
- dataSource,
- ImmutableList.of(interval)
- ).lookup(interval);
-
- if (existingChunks.size() > 1) {
- // Not possible to expand more than one chunk with a single segment.
- log.warn(
- "Cannot allocate new segment for dataSource[%s], interval[%s]:
already have [%,d] chunks.",
- dataSource,
- interval,
- existingChunks.size()
- );
- return null;
+ // max partitionId of published data segments which share the same
partition space.
+ SegmentIdWithShardSpec committedMaxId = null;
+ @Nullable
+ final String versionOfExistingChunk;
+ if (existingChunks.isEmpty()) {
+ versionOfExistingChunk = null;
} else {
- // max partitionId of published data segments which share the same
partition space.
- SegmentIdWithShardSpec committedMaxId = null;
+ TimelineObjectHolder<String, DataSegment> existingHolder =
Iterables.getOnlyElement(existingChunks);
+ versionOfExistingChunk = existingHolder.getVersion();
- @Nullable
- final String versionOfExistingChunk;
- if (existingChunks.isEmpty()) {
- versionOfExistingChunk = null;
- } else {
- TimelineObjectHolder<String, DataSegment> existingHolder =
Iterables.getOnlyElement(existingChunks);
- versionOfExistingChunk = existingHolder.getVersion();
-
- // Don't use the stream API for performance.
- for (DataSegment segment : FluentIterable
- .from(existingHolder.getObject())
- .transform(PartitionChunk::getObject)
- // Here we check only the segments of the shardSpec which shares
the same partition space with the given
- // partialShardSpec. Note that OverwriteShardSpec doesn't share
the partition space with others.
- // See PartitionIds.
- .filter(segment ->
segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
- if (committedMaxId == null
- || committedMaxId.getShardSpec().getPartitionNum() <
segment.getShardSpec().getPartitionNum()) {
- committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment);
- }
+ // Don't use the stream API for performance.
+ for (DataSegment segment : FluentIterable
+ .from(existingHolder.getObject())
+ .transform(PartitionChunk::getObject)
+ // Here we check only the segments of the shardSpec which shares the
same partition space with the given
+ // partialShardSpec. Note that OverwriteShardSpec doesn't share the
partition space with others.
+ // See PartitionIds.
+ .filter(segment ->
segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
+ if (committedMaxId == null
+ || committedMaxId.getShardSpec().getPartitionNum() <
segment.getShardSpec().getPartitionNum()) {
+ committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment);
}
}
+ }
- // Fetch the pending segments for this interval to determine max
partitionId
- // across all shard specs (published + pending).
- // A pending segment having a higher partitionId must also be considered
- // to avoid clashes when inserting the pending segment created here.
- final Set<SegmentIdWithShardSpec> pendings = new HashSet<>(
- getPendingSegmentsForIntervalWithHandle(handle, dataSource,
interval).keySet()
- );
- if (committedMaxId != null) {
- pendings.add(committedMaxId);
- }
+ // Fetch the pending segments for this interval to determine max
partitionId
+ // across all shard specs (published + pending).
+ // A pending segment having a higher partitionId must also be considered
+ // to avoid clashes when inserting the pending segment created here.
+ final Set<SegmentIdWithShardSpec> pendings = new HashSet<>(
+ getPendingSegmentsForIntervalWithHandle(handle, dataSource,
interval).keySet()
+ );
+ if (committedMaxId != null) {
+ pendings.add(committedMaxId);
+ }
- // If there is an existing chunk, find the max id with the same version
as the existing chunk.
- // There may still be a pending segment with a higher version (but no
corresponding used segments)
- // which may generate a clash with an existing segment once the new id
is generated
- final SegmentIdWithShardSpec overallMaxId;
- overallMaxId = pendings.stream()
- .filter(id ->
id.getShardSpec().sharePartitionSpace(partialShardSpec))
- .filter(id -> versionOfExistingChunk == null
- ||
id.getVersion().equals(versionOfExistingChunk))
-
.max(Comparator.comparing(SegmentIdWithShardSpec::getVersion)
- .thenComparing(id ->
id.getShardSpec().getPartitionNum()))
- .orElse(null);
-
-
- // Determine the version of the new segment
- final String newSegmentVersion;
- if (versionOfExistingChunk != null) {
- newSegmentVersion = versionOfExistingChunk;
- } else if (overallMaxId != null) {
- newSegmentVersion = overallMaxId.getVersion();
- } else {
- // this is the first segment for this interval
- newSegmentVersion = null;
- }
+ // If there is an existing chunk, find the max id with the same version as
the existing chunk.
+ // There may still be a pending segment with a higher version (but no
corresponding used segments)
+ // which may generate a clash with an existing segment once the new id is
generated
+ final SegmentIdWithShardSpec overallMaxId;
+ overallMaxId = pendings.stream()
+ .filter(id ->
id.getShardSpec().sharePartitionSpace(partialShardSpec))
+ .filter(id -> versionOfExistingChunk == null
+ ||
id.getVersion().equals(versionOfExistingChunk))
+
.max(Comparator.comparing(SegmentIdWithShardSpec::getVersion)
+ .thenComparing(id ->
id.getShardSpec().getPartitionNum()))
+ .orElse(null);
- if (overallMaxId == null) {
- // When appending segments, null overallMaxId means that we are
allocating the very initial
- // segment for this time chunk.
- // This code is executed when the Overlord coordinates segment
allocation, which is either you append segments
- // or you use segment lock. Since the core partitions set is not
determined for appended segments, we set
- // it 0. When you use segment lock, the core partitions set doesn't
work with it. We simply set it 0 so that the
- // OvershadowableManager handles the atomic segment update.
- final int newPartitionId =
partialShardSpec.useNonRootGenerationPartitionSpace()
- ?
PartitionIds.NON_ROOT_GEN_START_PARTITION_ID
- : PartitionIds.ROOT_GEN_START_PARTITION_ID;
- String version = newSegmentVersion == null ? existingVersion :
newSegmentVersion;
- return new SegmentIdWithShardSpec(
- dataSource,
- interval,
- version,
- partialShardSpec.complete(jsonMapper, newPartitionId, 0)
- );
- } else if (!overallMaxId.getInterval().equals(interval)) {
- log.warn(
- "Cannot allocate new segment for dataSource[%s], interval[%s],
existingVersion[%s]: conflicting segment[%s].",
- dataSource,
- interval,
- existingVersion,
- overallMaxId
- );
- return null;
- } else if (committedMaxId != null
- && committedMaxId.getShardSpec().getNumCorePartitions()
- == SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) {
- log.warn(
- "Cannot allocate new segment because of unknown core partition
size of segment[%s], shardSpec[%s]",
- committedMaxId,
- committedMaxId.getShardSpec()
- );
- return null;
- } else {
- // The number of core partitions must always be chosen from the set of
used segments in the SegmentTimeline.
- // When the core partitions have been dropped, using pending segments
may lead to an incorrect state
- // where the chunk is believed to have core partitions and queries
results are incorrect.
- return new SegmentIdWithShardSpec(
- dataSource,
- interval,
- Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"),
- partialShardSpec.complete(
- jsonMapper,
- overallMaxId.getShardSpec().getPartitionNum() + 1,
- committedMaxId == null ? 0 :
committedMaxId.getShardSpec().getNumCorePartitions()
- )
- );
- }
+ // Determine the version of the new segment
+ final String newSegmentVersion;
+ if (versionOfExistingChunk != null) {
+ newSegmentVersion = versionOfExistingChunk;
+ } else if (overallMaxId != null) {
+ newSegmentVersion = overallMaxId.getVersion();
+ } else {
+ // this is the first segment for this interval
+ newSegmentVersion = null;
+ }
+
+ if (overallMaxId == null) {
+ // When appending segments, null overallMaxId means that we are
allocating the very initial
+ // segment for this time chunk.
+ // This code is executed when the Overlord coordinates segment
allocation, which is either you append segments
+ // or you use segment lock. Since the core partitions set is not
determined for appended segments, we set
+ // it 0. When you use segment lock, the core partitions set doesn't work
with it. We simply set it 0 so that the
+ // OvershadowableManager handles the atomic segment update.
+ final int newPartitionId =
partialShardSpec.useNonRootGenerationPartitionSpace()
+ ? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID
+ : PartitionIds.ROOT_GEN_START_PARTITION_ID;
+ String version = newSegmentVersion == null ? existingVersion :
newSegmentVersion;
+ return new SegmentIdWithShardSpec(
+ dataSource,
+ interval,
+ version,
+ partialShardSpec.complete(jsonMapper, newPartitionId, 0)
+ );
+ } else if (!overallMaxId.getInterval().equals(interval)) {
+ log.warn(
+ "Cannot allocate new segment for dataSource[%s], interval[%s],
existingVersion[%s]: conflicting segment[%s].",
+ dataSource,
+ interval,
+ existingVersion,
+ overallMaxId
+ );
+ return null;
+ } else if (committedMaxId != null
+ && committedMaxId.getShardSpec().getNumCorePartitions()
+ == SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) {
+ log.warn(
+ "Cannot allocate new segment because of unknown core partition size
of segment[%s], shardSpec[%s]",
+ committedMaxId,
+ committedMaxId.getShardSpec()
+ );
+ return null;
+ } else {
+ // The number of core partitions must always be chosen from the set of
used segments in the SegmentTimeline.
+ // When the core partitions have been dropped, using pending segments
may lead to an incorrect state
+ // where the chunk is believed to have core partitions and queries
results are incorrect.
+
+ return new SegmentIdWithShardSpec(
+ dataSource,
+ interval,
+ Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"),
+ partialShardSpec.complete(
+ jsonMapper,
+ overallMaxId.getShardSpec().getPartitionNum() + 1,
+ committedMaxId == null ? 0 :
committedMaxId.getShardSpec().getNumCorePartitions()
+ )
+ );
}
}
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 9e977dec3e8..4ee72e74f92 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -2078,7 +2078,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
* - verify that the id for segment5 is correct
* - Later, after the above was dropped, another segment on same interval
was created by the stream but this
* time there was an integrity violation in the pending segments table
because the
- * {@link IndexerSQLMetadataStorageCoordinator#createNewSegment(Handle,
String, Interval, PartialShardSpec, String)}
* method returned a segment id that already existed in the pending segments
table
*/
@Test
@@ -2178,7 +2177,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_2",
identifier4.toString());
// Since all core partitions have been dropped
Assert.assertEquals(0, identifier4.getShardSpec().getNumCorePartitions());
-
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]