This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 24e5d8a9e81 Refactor: Minor cleanup of segment allocation flow (#17524)
24e5d8a9e81 is described below
commit 24e5d8a9e81fffd656e9544ae16bea53d7e60808
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Dec 12 18:16:57 2024 -0800
Refactor: Minor cleanup of segment allocation flow (#17524)
Changes
--------
- Simplify the arguments of
IndexerMetadataStorageCoordinator.allocatePendingSegment
- Remove field SegmentCreateRequest.upgradedFromSegmentId as it was always
null
- Miscellaneous cleanup
---
.../druid/indexing/overlord/TaskLockbox.java | 80 ++++---
.../druid/indexing/overlord/TaskLockboxTest.java | 4 +-
.../TestIndexerMetadataStorageCoordinator.java | 11 +-
.../timeline/partition/OverwriteShardSpec.java | 1 +
.../druid/timeline/partition/PartitionIds.java | 1 +
.../IndexerMetadataStorageCoordinator.java | 22 +-
.../indexing/overlord/SegmentCreateRequest.java | 20 +-
.../IndexerSQLMetadataStorageCoordinator.java | 242 ++++++++-------------
.../overlord/SegmentCreateRequestTest.java | 1 -
.../IndexerSQLMetadataStorageCoordinatorTest.java | 102 +++++----
10 files changed, 213 insertions(+), 271 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 916f8cab75c..2c1b78d3ada 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -183,7 +183,7 @@ public class TaskLockbox
?
savedTaskLock.withPriority(task.getPriority())
: savedTaskLock;
- final TaskLockPosse taskLockPosse = verifyAndCreateOrFindLockPosse(
+ final TaskLockPosse taskLockPosse = reacquireLockOnStartup(
task,
savedTaskLockWithPriority
);
@@ -192,15 +192,11 @@ public class TaskLockbox
if
(savedTaskLockWithPriority.getVersion().equals(taskLock.getVersion())) {
taskLockCount++;
- log.info(
- "Reacquired lock[%s] for task: %s",
- taskLock,
- task.getId()
- );
+ log.info("Reacquired lock[%s] for task[%s].", taskLock,
task.getId());
} else {
taskLockCount++;
log.info(
- "Could not reacquire lock on interval[%s] version[%s] (got
version[%s] instead) for task: %s",
+ "Could not reacquire lock on interval[%s] version[%s] (got
version[%s] instead) for task[%s].",
savedTaskLockWithPriority.getInterval(),
savedTaskLockWithPriority.getVersion(),
taskLock.getVersion(),
@@ -210,7 +206,7 @@ public class TaskLockbox
} else {
failedToReacquireLockTaskGroups.add(task.getGroupId());
log.error(
- "Could not reacquire lock on interval[%s] version[%s] for task:
%s from group %s.",
+ "Could not reacquire lock on interval[%s] version[%s] for
task[%s], groupId[%s].",
savedTaskLockWithPriority.getInterval(),
savedTaskLockWithPriority.getVersion(),
task.getId(),
@@ -253,38 +249,28 @@ public class TaskLockbox
}
/**
- * This method is called only in {@link #syncFromStorage()} and verifies the
given task and the taskLock have the same
- * groupId, dataSource, and priority.
+ * Reacquire lock during {@link #syncFromStorage()}.
+ *
+ * @return null if the lock could not be reacquired.
*/
@VisibleForTesting
@Nullable
- protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock
taskLock)
+ protected TaskLockPosse reacquireLockOnStartup(Task task, TaskLock taskLock)
{
+ if (!taskMatchesLock(task, taskLock)) {
+ log.warn(
+ "Task[datasource: %s, groupId: %s, priority: %s] does not match"
+ + " TaskLock[datasource: %s, groupId: %s, priority: %s].",
+ task.getDataSource(), task.getGroupId(), task.getPriority(),
+ taskLock.getDataSource(), taskLock.getGroupId(),
taskLock.getNonNullPriority()
+ );
+ return null;
+ }
+
giant.lock();
try {
- Preconditions.checkArgument(
- task.getGroupId().equals(taskLock.getGroupId()),
- "lock groupId[%s] is different from task groupId[%s]",
- taskLock.getGroupId(),
- task.getGroupId()
- );
- Preconditions.checkArgument(
- task.getDataSource().equals(taskLock.getDataSource()),
- "lock dataSource[%s] is different from task dataSource[%s]",
- taskLock.getDataSource(),
- task.getDataSource()
- );
final int taskPriority = task.getPriority();
- final int lockPriority = taskLock.getNonNullPriority();
-
- Preconditions.checkArgument(
- lockPriority == taskPriority,
- "lock priority[%s] is different from task priority[%s]",
- lockPriority,
- taskPriority
- );
-
final LockRequest request;
switch (taskLock.getGranularity()) {
case SEGMENT:
@@ -313,15 +299,13 @@ public class TaskLockbox
);
break;
default:
- throw new ISE("Unknown lockGranularity[%s]",
taskLock.getGranularity());
+ throw DruidException.defensive("Unknown lockGranularity[%s]",
taskLock.getGranularity());
}
return createOrFindLockPosse(request, task, false);
}
catch (Exception e) {
- log.error(e,
- "Could not reacquire lock for task: %s from metadata store",
task.getId()
- );
+ log.error(e, "Could not reacquire lock for task[%s] from metadata
store", task.getId());
return null;
}
finally {
@@ -329,6 +313,17 @@ public class TaskLockbox
}
}
+ /**
+ * Returns true if the datasource, groupId and priority of the given Task
+ * match that of the TaskLock.
+ */
+ private boolean taskMatchesLock(Task task, TaskLock taskLock)
+ {
+ return task.getGroupId().equals(taskLock.getGroupId())
+ && task.getDataSource().equals(taskLock.getDataSource())
+ && task.getPriority() == taskLock.getNonNullPriority();
+ }
+
/**
* Acquires a lock on behalf of a task. Blocks until the lock is acquired.
*
@@ -751,13 +746,15 @@ public class TaskLockbox
{
return metadataStorageCoordinator.allocatePendingSegment(
request.getDataSource(),
- request.getSequenceName(),
- request.getPreviousSegmentId(),
request.getInterval(),
- request.getPartialShardSpec(),
- version,
request.isSkipSegmentLineageCheck(),
- allocatorId
+ new SegmentCreateRequest(
+ request.getSequenceName(),
+ request.getPreviousSegmentId(),
+ version,
+ request.getPartialShardSpec(),
+ allocatorId
+ )
);
}
@@ -1818,7 +1815,6 @@ public class TaskLockbox
action.getPreviousSegmentId(),
acquiredLock == null ? lockRequest.getVersion() :
acquiredLock.getVersion(),
action.getPartialShardSpec(),
- null,
((PendingSegmentAllocatingTask) task).getTaskAllocatorId()
);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index a8c4b5117b1..8f47b78a3bf 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -2270,10 +2270,10 @@ public class TaskLockboxTest
}
@Override
- protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock
taskLock)
+ protected TaskLockPosse reacquireLockOnStartup(Task task, TaskLock
taskLock)
{
return task.getGroupId()
- .contains("FailingLockAcquisition") ? null :
super.verifyAndCreateOrFindLockPosse(task, taskLock);
+ .contains("FailingLockAcquisition") ? null :
super.reacquireLockOnStartup(task, taskLock);
}
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index 54e323581c4..a95d73ce1bb 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -36,7 +36,6 @@ import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
-import org.apache.druid.timeline.partition.PartialShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -242,20 +241,16 @@ public class TestIndexerMetadataStorageCoordinator
implements IndexerMetadataSto
@Override
public SegmentIdWithShardSpec allocatePendingSegment(
String dataSource,
- String sequenceName,
- String previousSegmentId,
Interval interval,
- PartialShardSpec partialShardSpec,
- String maxVersion,
boolean skipSegmentLineageCheck,
- String taskAllocatorId
+ SegmentCreateRequest createRequest
)
{
return new SegmentIdWithShardSpec(
dataSource,
interval,
- maxVersion,
- partialShardSpec.complete(objectMapper, 0, 0)
+ createRequest.getVersion(),
+ createRequest.getPartialShardSpec().complete(objectMapper, 0, 0)
);
}
diff --git
a/processing/src/main/java/org/apache/druid/timeline/partition/OverwriteShardSpec.java
b/processing/src/main/java/org/apache/druid/timeline/partition/OverwriteShardSpec.java
index ec784c44ffb..a18f829efdd 100644
---
a/processing/src/main/java/org/apache/druid/timeline/partition/OverwriteShardSpec.java
+++
b/processing/src/main/java/org/apache/druid/timeline/partition/OverwriteShardSpec.java
@@ -21,6 +21,7 @@ package org.apache.druid.timeline.partition;
/**
* ShardSpec for non-first-generation segments.
+ * This shardSpec is created only by overwriting tasks using segment locks.
* This shardSpec is allocated a partitionId between {@link
PartitionIds#NON_ROOT_GEN_START_PARTITION_ID} and
* {@link PartitionIds#NON_ROOT_GEN_END_PARTITION_ID}.
*
diff --git
a/processing/src/main/java/org/apache/druid/timeline/partition/PartitionIds.java
b/processing/src/main/java/org/apache/druid/timeline/partition/PartitionIds.java
index fc5d0e981c4..2bd93125225 100644
---
a/processing/src/main/java/org/apache/druid/timeline/partition/PartitionIds.java
+++
b/processing/src/main/java/org/apache/druid/timeline/partition/PartitionIds.java
@@ -31,6 +31,7 @@ public final class PartitionIds
public static final int ROOT_GEN_END_PARTITION_ID = 32768; // exclusive
/**
* Start partitionId available for non-root generation segments.
+ * Used only with segment locks.
*/
public static final int NON_ROOT_GEN_START_PARTITION_ID = 32768;
/**
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index da54d7e3998..5e840b07b6d 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -26,7 +26,6 @@ import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
-import org.apache.druid.timeline.partition.PartialShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -215,34 +214,23 @@ public interface IndexerMetadataStorageCoordinator
* Note that a segment sequence may include segments with a variety of
different intervals and versions.
*
* @param dataSource dataSource for which to allocate a segment
- * @param sequenceName name of the group of ingestion tasks
producing a segment series
- * @param previousSegmentId previous segment in the series; may be
null or empty, meaning this is the first
- * segment
* @param interval interval for which to allocate a segment
- * @param partialShardSpec partialShardSpec containing all necessary
information to create a shardSpec for the
- * new segmentId
- * @param maxVersion use this version if we have no better
version to use. The returned segment
- * identifier may have a version lower than
this one, but will not have one higher.
* @param skipSegmentLineageCheck if true, perform lineage validation using
previousSegmentId for this sequence.
* Should be set to false if replica tasks
would index events in same order
- * @param taskAllocatorId The task allocator id with which the
pending segment is associated
* @return the pending segment identifier, or null if it was impossible to
allocate a new segment
*/
+ @Nullable
SegmentIdWithShardSpec allocatePendingSegment(
String dataSource,
- String sequenceName,
- @Nullable String previousSegmentId,
Interval interval,
- PartialShardSpec partialShardSpec,
- String maxVersion,
boolean skipSegmentLineageCheck,
- String taskAllocatorId
+ SegmentCreateRequest createRequest
);
/**
* Delete pending segments created in the given interval belonging to the
given data source from the pending segments
* table. The {@code created_date} field of the pending segments table is
checked to find segments to be deleted.
- *
+ * <p>
* Note that the semantic of the interval (for `created_date`s) is different
from the semantic of the interval
* parameters in some other methods in this class, such as {@link
#retrieveUsedSegmentsForInterval} (where the
* interval is about the time column value in rows belonging to the segment).
@@ -269,7 +257,7 @@ public interface IndexerMetadataStorageCoordinator
* <p/>
* If startMetadata and endMetadata are set, this insertion will be atomic
with a compare-and-swap on dataSource
* commit metadata.
- *
+ * <p>
* If segmentsToDrop is not null and not empty, this insertion will be
atomic with a insert-and-drop on inserting
* {@param segments} and dropping {@param segmentsToDrop}.
*
@@ -426,7 +414,7 @@ public interface IndexerMetadataStorageCoordinator
* Similar to {@link #commitSegments}, but meant for streaming ingestion
tasks for handling
* the case where the task ingested no records and created no segments, but
still needs to update the metadata
* with the progress that the task made.
- *
+ * <p>
* The metadata should undergo the same validation checks as performed by
{@link #commitSegments}.
*
*
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java
b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java
index 49b31e5e6ff..bcbf9416fe8 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java
@@ -38,7 +38,6 @@ public class SegmentCreateRequest
private final String sequenceName;
private final String previousSegmentId;
private final PartialShardSpec partialShardSpec;
- private final String upgradedFromSegmentId;
private final String taskAllocatorId;
public SegmentCreateRequest(
@@ -46,7 +45,6 @@ public class SegmentCreateRequest
String previousSegmentId,
String version,
PartialShardSpec partialShardSpec,
- String upgradedFromSegmentId,
String taskAllocatorId
)
{
@@ -54,24 +52,31 @@ public class SegmentCreateRequest
this.previousSegmentId = previousSegmentId == null ? "" :
previousSegmentId;
this.version = version;
this.partialShardSpec = partialShardSpec;
- this.upgradedFromSegmentId = upgradedFromSegmentId;
this.taskAllocatorId = taskAllocatorId;
}
+ /**
+ * Represents group of ingestion tasks that produce a segment series.
+ */
public String getSequenceName()
{
return sequenceName;
}
/**
- * Non-null previous segment id. This can be used for persisting to the
- * pending segments table in the metadata store.
+ * Previous segment id allocated for this sequence.
+ *
+ * @return Empty string if there is no previous segment in the series.
*/
public String getPreviousSegmentId()
{
return previousSegmentId;
}
+ /**
+ * Version of the lock held by the task that has requested the segment
allocation.
+ * The allocated segment must have a version less than or equal to this
version.
+ */
public String getVersion()
{
return version;
@@ -82,11 +87,6 @@ public class SegmentCreateRequest
return partialShardSpec;
}
- public String getUpgradedFromSegmentId()
- {
- return upgradedFromSegmentId;
- }
-
public String getTaskAllocatorId()
{
return taskAllocatorId;
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 0717c9b07ee..c85fd1c4960 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -80,7 +80,6 @@ import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.Update;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
-import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.util.ByteArrayMapper;
import javax.annotation.Nullable;
@@ -350,8 +349,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
/**
* Fetches all the pending segments, whose interval overlaps with the given
search interval, from the metadata store.
*/
- @VisibleForTesting
- List<PendingSegmentRecord> getPendingSegmentsForIntervalWithHandle(
+ private List<PendingSegmentRecord> getPendingSegmentsForInterval(
final Handle handle,
final String dataSource,
final Interval interval
@@ -390,7 +388,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
return pendingSegments.build();
}
- List<PendingSegmentRecord> getPendingSegmentsForTaskAllocatorIdWithHandle(
+ private List<PendingSegmentRecord> getPendingSegmentsForTaskAllocatorId(
final Handle handle,
final String dataSource,
final String taskAllocatorId
@@ -580,7 +578,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
);
}
}
- SegmentPublishResult result = SegmentPublishResult.ok(
+ return SegmentPublishResult.ok(
insertSegments(
handle,
segmentsToInsert,
@@ -591,7 +589,6 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
),
upgradePendingSegmentsOverlappingWith(segmentsToInsert)
);
- return result;
},
3,
getSqlMetadataMaxRetry()
@@ -740,21 +737,16 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
@Override
+ @Nullable
public SegmentIdWithShardSpec allocatePendingSegment(
final String dataSource,
- final String sequenceName,
- @Nullable final String previousSegmentId,
final Interval interval,
- final PartialShardSpec partialShardSpec,
- final String maxVersion,
final boolean skipSegmentLineageCheck,
- String taskAllocatorId
+ final SegmentCreateRequest createRequest
)
{
Preconditions.checkNotNull(dataSource, "dataSource");
- Preconditions.checkNotNull(sequenceName, "sequenceName");
Preconditions.checkNotNull(interval, "interval");
- Preconditions.checkNotNull(maxVersion, "version");
final Interval allocateInterval =
interval.withChronology(ISOChronology.getInstanceUTC());
return connector.retryWithHandle(
@@ -776,24 +768,17 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
return allocatePendingSegment(
handle,
dataSource,
- sequenceName,
allocateInterval,
- partialShardSpec,
- maxVersion,
- existingChunks,
- taskAllocatorId
+ createRequest,
+ existingChunks
);
} else {
return allocatePendingSegmentWithSegmentLineageCheck(
handle,
dataSource,
- sequenceName,
- previousSegmentId,
allocateInterval,
- partialShardSpec,
- maxVersion,
- existingChunks,
- taskAllocatorId
+ createRequest,
+ existingChunks
);
}
}
@@ -854,7 +839,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
int currentPartitionNumber =
maxSegmentId.getShardSpec().getPartitionNum();
final List<PendingSegmentRecord> overlappingPendingSegments
- = getPendingSegmentsForIntervalWithHandle(handle, datasource,
replaceInterval);
+ = getPendingSegmentsForInterval(handle, datasource, replaceInterval);
for (PendingSegmentRecord overlappingPendingSegment :
overlappingPendingSegments) {
final SegmentIdWithShardSpec pendingSegmentId =
overlappingPendingSegment.getId();
@@ -929,17 +914,11 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck(
final Handle handle,
final String dataSource,
- final String sequenceName,
- @Nullable final String previousSegmentId,
final Interval interval,
- final PartialShardSpec partialShardSpec,
- final String maxVersion,
- final List<TimelineObjectHolder<String, DataSegment>> existingChunks,
- final String taskAllocatorId
+ final SegmentCreateRequest createRequest,
+ final List<TimelineObjectHolder<String, DataSegment>> existingChunks
) throws IOException
{
- final String previousSegmentIdNotNull = previousSegmentId == null ? "" :
previousSegmentId;
-
final String sql = StringUtils.format(
"SELECT payload FROM %s WHERE "
+ "dataSource = :dataSource AND "
@@ -950,15 +929,15 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
final Query<Map<String, Object>> query
= handle.createQuery(sql)
.bind("dataSource", dataSource)
- .bind("sequence_name", sequenceName)
- .bind("sequence_prev_id", previousSegmentIdNotNull);
+ .bind("sequence_name", createRequest.getSequenceName())
+ .bind("sequence_prev_id",
createRequest.getPreviousSegmentId());
final String usedSegmentVersion = existingChunks.isEmpty() ? null :
existingChunks.get(0).getVersion();
final CheckExistingSegmentIdResult result = findExistingPendingSegment(
query,
interval,
- sequenceName,
- previousSegmentIdNotNull,
+ createRequest.getSequenceName(),
+ createRequest.getPreviousSegmentId(),
usedSegmentVersion
);
@@ -967,12 +946,12 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
return result.segmentIdentifier;
}
- final SegmentIdWithShardSpec newIdentifier = createNewSegment(
+ final SegmentIdWithShardSpec newIdentifier = createNewPendingSegment(
handle,
dataSource,
interval,
- partialShardSpec,
- maxVersion,
+ createRequest.getPartialShardSpec(),
+ createRequest.getVersion(),
existingChunks
);
if (newIdentifier == null) {
@@ -989,9 +968,9 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode(
Hashing.sha1()
.newHasher()
- .putBytes(StringUtils.toUtf8(sequenceName))
+ .putBytes(StringUtils.toUtf8(createRequest.getSequenceName()))
.putByte((byte) 0xff)
- .putBytes(StringUtils.toUtf8(previousSegmentIdNotNull))
+
.putBytes(StringUtils.toUtf8(createRequest.getPreviousSegmentId()))
.putByte((byte) 0xff)
.putBytes(StringUtils.toUtf8(newIdentifier.getVersion()))
.hash()
@@ -1003,10 +982,10 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
newIdentifier,
dataSource,
interval,
- previousSegmentIdNotNull,
- sequenceName,
+ createRequest.getPreviousSegmentId(),
+ createRequest.getSequenceName(),
sequenceNamePrevIdSha1,
- taskAllocatorId
+ createRequest.getTaskAllocatorId()
);
return newIdentifier;
}
@@ -1108,12 +1087,9 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
private SegmentIdWithShardSpec allocatePendingSegment(
final Handle handle,
final String dataSource,
- final String sequenceName,
final Interval interval,
- final PartialShardSpec partialShardSpec,
- final String maxVersion,
- final List<TimelineObjectHolder<String, DataSegment>> existingChunks,
- final String taskAllocatorId
+ final SegmentCreateRequest createRequest,
+ final List<TimelineObjectHolder<String, DataSegment>> existingChunks
) throws IOException
{
final String sql = StringUtils.format(
@@ -1128,14 +1104,14 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
final Query<Map<String, Object>> query
= handle.createQuery(sql)
.bind("dataSource", dataSource)
- .bind("sequence_name", sequenceName)
+ .bind("sequence_name", createRequest.getSequenceName())
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString());
final CheckExistingSegmentIdResult result = findExistingPendingSegment(
query,
interval,
- sequenceName,
+ createRequest.getSequenceName(),
null,
existingChunks.isEmpty() ? null : existingChunks.get(0).getVersion()
);
@@ -1144,12 +1120,12 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
return result.segmentIdentifier;
}
- final SegmentIdWithShardSpec newIdentifier = createNewSegment(
+ final SegmentIdWithShardSpec newIdentifier = createNewPendingSegment(
handle,
dataSource,
interval,
- partialShardSpec,
- maxVersion,
+ createRequest.getPartialShardSpec(),
+ createRequest.getVersion(),
existingChunks
);
if (newIdentifier == null) {
@@ -1166,7 +1142,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode(
Hashing.sha1()
.newHasher()
- .putBytes(StringUtils.toUtf8(sequenceName))
+ .putBytes(StringUtils.toUtf8(createRequest.getSequenceName()))
.putByte((byte) 0xff)
.putLong(interval.getStartMillis())
.putLong(interval.getEndMillis())
@@ -1183,14 +1159,14 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
dataSource,
interval,
"",
- sequenceName,
+ createRequest.getSequenceName(),
sequenceNamePrevIdSha1,
- taskAllocatorId
+ createRequest.getTaskAllocatorId()
);
log.info(
- "Created new pending segment[%s] for datasource[%s], sequence[%s],
interval[%s].",
- newIdentifier, dataSource, sequenceName, interval
+ "Created new pending segment[%s] for datasource[%s], interval[%s].",
+ newIdentifier, dataSource, interval
);
return newIdentifier;
@@ -1334,7 +1310,6 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
private static class CheckExistingSegmentIdResult
{
private final boolean found;
- @Nullable
private final SegmentIdWithShardSpec segmentIdentifier;
CheckExistingSegmentIdResult(boolean found, @Nullable
SegmentIdWithShardSpec segmentIdentifier)
@@ -1391,21 +1366,6 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
}
- private static void bindColumnValuesToQueryWithInCondition(
- final String columnName,
- final List<String> values,
- final Update query
- )
- {
- if (values == null) {
- return;
- }
-
- for (int i = 0; i < values.size(); i++) {
- query.bind(StringUtils.format("%s%d", columnName, i), values.get(i));
- }
- }
-
private int deletePendingSegmentsById(Handle handle, String datasource,
List<String> pendingSegmentIds)
{
if (pendingSegmentIds.isEmpty()) {
@@ -1419,7 +1379,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id",
pendingSegmentIds)
)
).bind("dataSource", datasource);
- bindColumnValuesToQueryWithInCondition("id", pendingSegmentIds, query);
+ SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition("id",
pendingSegmentIds, query);
return query.execute();
}
@@ -1442,7 +1402,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
final String dataSource = appendSegments.iterator().next().getDataSource();
final List<PendingSegmentRecord> segmentIdsForNewVersions =
connector.retryTransaction(
(handle, transactionStatus)
- -> getPendingSegmentsForTaskAllocatorIdWithHandle(handle,
dataSource, taskAllocatorId),
+ -> getPendingSegmentsForTaskAllocatorId(handle, dataSource,
taskAllocatorId),
0,
SQLMetadataConnector.DEFAULT_MAX_TRIES
);
@@ -1668,11 +1628,11 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
// across all shard specs (published + pending).
// A pending segment having a higher partitionId must also be considered
// to avoid clashes when inserting the pending segment created here.
- final Set<SegmentIdWithShardSpec> pendingSegments = new HashSet<>(
- getPendingSegmentsForIntervalWithHandle(handle, dataSource,
interval).stream()
-
.map(PendingSegmentRecord::getId)
-
.collect(Collectors.toSet())
- );
+ final Set<SegmentIdWithShardSpec> pendingSegments =
+ getPendingSegmentsForInterval(handle, dataSource, interval)
+ .stream()
+ .map(PendingSegmentRecord::getId)
+ .collect(Collectors.toSet());
final Map<SegmentCreateRequest, PendingSegmentRecord> createdSegments =
new HashMap<>();
final Map<UniqueAllocateRequest, PendingSegmentRecord>
uniqueRequestToSegment = new HashMap<>();
@@ -1686,7 +1646,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
if (uniqueRequestToSegment.containsKey(uniqueRequest)) {
createdSegment = uniqueRequestToSegment.get(uniqueRequest);
} else {
- createdSegment = createNewSegment(
+ createdSegment = createNewPendingSegment(
request,
dataSource,
interval,
@@ -1712,7 +1672,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
return createdSegments;
}
- private PendingSegmentRecord createNewSegment(
+ @Nullable
+ private PendingSegmentRecord createNewPendingSegment(
SegmentCreateRequest request,
String dataSource,
Interval interval,
@@ -1775,17 +1736,14 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
pendingSegmentId,
request.getSequenceName(),
request.getPreviousSegmentId(),
- request.getUpgradedFromSegmentId(),
+ null,
request.getTaskAllocatorId()
);
} else if (!overallMaxId.getInterval().equals(interval)) {
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s],
existingVersion[%s]: conflicting segment[%s].",
- dataSource,
- interval,
- existingVersion,
- overallMaxId
+ dataSource, interval, existingVersion, overallMaxId
);
return null;
} else if (committedMaxId != null
@@ -1793,8 +1751,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
== SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) {
log.warn(
"Cannot allocate new segment because of unknown core partition size
of segment[%s], shardSpec[%s]",
- committedMaxId,
- committedMaxId.getShardSpec()
+ committedMaxId, committedMaxId.getShardSpec()
);
return null;
} else {
@@ -1815,28 +1772,20 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
getTrueAllocatedId(pendingSegmentId),
request.getSequenceName(),
request.getPreviousSegmentId(),
- request.getUpgradedFromSegmentId(),
+ null,
request.getTaskAllocatorId()
);
}
}
/**
- * This function creates a new segment for the given
datasource/interval/etc. A critical
- * aspect of the creation is to make sure that the new version & new
partition number will make
- * sense given the existing segments & pending segments also very important
is to avoid
- * clashes with existing pending & used/unused segments.
- * @param handle Database handle
- * @param dataSource datasource for the new segment
- * @param interval interval for the new segment
+ * Creates a new pending segment for the given datasource and interval.
* @param partialShardSpec Shard spec info minus segment id stuff
* @param existingVersion Version of segments in interval, used to compute
the version of the very first segment in
* interval
- * @return
- * @throws IOException
*/
@Nullable
- private SegmentIdWithShardSpec createNewSegment(
+ private SegmentIdWithShardSpec createNewPendingSegment(
final Handle handle,
final String dataSource,
final Interval interval,
@@ -1876,11 +1825,12 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
// across all shard specs (published + pending).
// A pending segment having a higher partitionId must also be considered
// to avoid clashes when inserting the pending segment created here.
- final Set<SegmentIdWithShardSpec> pendings = new HashSet<>(
- getPendingSegmentsForIntervalWithHandle(handle, dataSource,
interval).stream()
-
.map(PendingSegmentRecord::getId)
-
.collect(Collectors.toSet())
- );
+ final Set<SegmentIdWithShardSpec> pendings =
+ getPendingSegmentsForInterval(handle, dataSource, interval)
+ .stream()
+ .map(PendingSegmentRecord::getId)
+ .collect(Collectors.toSet());
+
if (committedMaxId != null) {
pendings.add(committedMaxId);
}
@@ -1910,11 +1860,9 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
if (overallMaxId == null) {
- // When appending segments, null overallMaxId means that we are
allocating the very initial
- // segment for this time chunk.
- // This code is executed when the Overlord coordinates segment
allocation, which is either you append segments
- // or you use segment lock. Since the core partitions set is not
determined for appended segments, we set
- // it 0. When you use segment lock, the core partitions set doesn't work
with it. We simply set it 0 so that the
+ // We are allocating the very first segment for this time chunk.
+ // Set numCorePartitions to 0 as core partitions are not determined for
append segments
+ // When you use segment lock, the core partitions set doesn't work with
it. We simply set it 0 so that the
// OvershadowableManager handles the atomic segment update.
final int newPartitionId =
partialShardSpec.useNonRootGenerationPartitionSpace()
? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID
@@ -1929,10 +1877,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
} else if (!overallMaxId.getInterval().equals(interval)) {
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s],
existingVersion[%s]: conflicting segment[%s].",
- dataSource,
- interval,
- existingVersion,
- overallMaxId
+ dataSource, interval, existingVersion, overallMaxId
);
return null;
} else if (committedMaxId != null
@@ -1940,14 +1885,12 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
== SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) {
log.warn(
"Cannot allocate new segment because of unknown core partition size
of segment[%s], shardSpec[%s]",
- committedMaxId,
- committedMaxId.getShardSpec()
+ committedMaxId, committedMaxId.getShardSpec()
);
return null;
} else {
- // The number of core partitions must always be chosen from the set of
used segments in the SegmentTimeline.
- // When the core partitions have been dropped, using pending segments
may lead to an incorrect state
- // where the chunk is believed to have core partitions and queries
results are incorrect.
+ // numCorePartitions must always be picked from the committedMaxId and
not overallMaxId
+ // as overallMaxId may refer to a pending segment which might have stale
info of numCorePartitions
final SegmentIdWithShardSpec allocatedId = new SegmentIdWithShardSpec(
dataSource,
interval,
@@ -1963,7 +1906,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
/**
- * Verifies that the allocated id doesn't already exist in the druid
segments table.
+ * Verifies that the allocated id doesn't already exist in the
druid_segments table.
* If yes, try to get the max unallocated id considering the unused segments
for the datasource, version and interval
* Otherwise, use the same id.
* @param allocatedId The segment allcoted on the basis of used and pending
segments
@@ -1977,7 +1920,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
// If yes, try to compute allocated partition num using the max unused
segment shard spec
- SegmentId unusedMaxId = getUnusedMaxId(
+ SegmentId unusedMaxId = getMaxIdOfUnusedSegment(
allocatedId.getDataSource(),
allocatedId.getInterval(),
allocatedId.getVersion()
@@ -2002,7 +1945,14 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
);
}
- private SegmentId getUnusedMaxId(String datasource, Interval interval,
String version)
+ /**
+ * Determines the highest ID amongst unused segments for the given
datasource,
+ * interval and version.
+ *
+ * @return null if no unused segment exists for the given parameters.
+ */
+ @Nullable
+ private SegmentId getMaxIdOfUnusedSegment(String datasource, Interval
interval, String version)
{
List<String> unusedSegmentIds =
retrieveUnusedSegmentIdsForExactIntervalAndVersion(
datasource,
@@ -2134,7 +2084,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
.bind("created_date", now)
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
- .bind("partitioned", (segment.getShardSpec() instanceof
NoneShardSpec) ? false : true)
+ .bind("partitioned", !(segment.getShardSpec() instanceof
NoneShardSpec))
.bind("version", segment.getVersion())
.bind("used", usedSegments.contains(segment))
.bind("payload", jsonMapper.writeValueAsBytes(segment))
@@ -2330,9 +2280,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
Map<String, String> upgradedFromSegmentIdMap
) throws IOException
{
- boolean shouldPersistSchema = shouldPersistSchema(segmentSchemaMapping);
-
- if (shouldPersistSchema) {
+ if (shouldPersistSchema(segmentSchemaMapping)) {
persistSchema(handle, segments, segmentSchemaMapping);
}
@@ -2407,6 +2355,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
return segmentsToInsert;
}
+ @Nullable
private SegmentMetadata getSegmentMetadataFromSchemaMappingOrUpgradeMetadata(
final SegmentId segmentId,
final SegmentSchemaMapping segmentSchemaMapping,
@@ -2786,27 +2735,18 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes()
);
+ final String sql = "UPDATE %s SET "
+ + "commit_metadata_payload =
:new_commit_metadata_payload, "
+ + "commit_metadata_sha1 = :new_commit_metadata_sha1 "
+ + "WHERE dataSource = :dataSource";
return connector.retryWithHandle(
- new HandleCallback<Boolean>()
- {
- @Override
- public Boolean withHandle(Handle handle)
- {
- final int numRows = handle.createStatement(
- StringUtils.format(
- "UPDATE %s SET "
- + "commit_metadata_payload = :new_commit_metadata_payload,
"
- + "commit_metadata_sha1 = :new_commit_metadata_sha1 "
- + "WHERE dataSource = :dataSource",
- dbTables.getDataSourceTable()
- )
- )
- .bind("dataSource", dataSource)
- .bind("new_commit_metadata_payload",
newCommitMetadataBytes)
- .bind("new_commit_metadata_sha1",
newCommitMetadataSha1)
- .execute();
- return numRows == 1;
- }
+ handle -> {
+ final int numRows = handle.createStatement(StringUtils.format(sql,
dbTables.getDataSourceTable()))
+ .bind("dataSource", dataSource)
+ .bind("new_commit_metadata_payload",
newCommitMetadataBytes)
+ .bind("new_commit_metadata_sha1",
newCommitMetadataSha1)
+ .execute();
+ return numRows == 1;
}
);
}
@@ -3028,7 +2968,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
public List<PendingSegmentRecord> getPendingSegments(String datasource,
Interval interval)
{
return connector.retryWithHandle(
- handle -> getPendingSegmentsForIntervalWithHandle(handle, datasource,
interval)
+ handle -> getPendingSegmentsForInterval(handle, datasource, interval)
);
}
@@ -3178,7 +3118,6 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
{
private final boolean failed;
private final boolean canRetry;
- @Nullable
private final String errorMsg;
public static final DataStoreMetadataUpdateResult SUCCESS = new
DataStoreMetadataUpdateResult(false, false, null);
@@ -3198,7 +3137,6 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
this.failed = failed;
this.canRetry = canRetry;
this.errorMsg = null == errorMsg ? null : StringUtils.format(errorMsg,
errorFormatArgs);
-
}
public boolean isFailed()
diff --git
a/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java
b/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java
index 57e01d76a44..567867bd97e 100644
---
a/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java
+++
b/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java
@@ -36,7 +36,6 @@ public class SegmentCreateRequestTest
null,
"version",
partialShardSpec,
- null,
null
);
Assert.assertEquals("sequence", request.getSequenceName());
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index a000fbec5a3..06bbf3b7ecd 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -2280,7 +2280,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
final PartialShardSpec partialShardSpec =
NumberedPartialShardSpec.instance();
final String dataSource = "ds";
final Interval interval = Intervals.of("2017-01-01/2017-02-01");
- final SegmentIdWithShardSpec identifier =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier = allocatePendingSegment(
dataSource,
"seq",
null,
@@ -2293,7 +2293,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version",
identifier.toString());
- final SegmentIdWithShardSpec identifier1 =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier1 = allocatePendingSegment(
dataSource,
"seq",
identifier.toString(),
@@ -2306,7 +2306,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_1",
identifier1.toString());
- final SegmentIdWithShardSpec identifier2 =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier2 = allocatePendingSegment(
dataSource,
"seq",
identifier1.toString(),
@@ -2319,7 +2319,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2",
identifier2.toString());
- final SegmentIdWithShardSpec identifier3 =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier3 = allocatePendingSegment(
dataSource,
"seq",
identifier1.toString(),
@@ -2333,7 +2333,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2",
identifier3.toString());
Assert.assertEquals(identifier2, identifier3);
- final SegmentIdWithShardSpec identifier4 =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier4 = allocatePendingSegment(
dataSource,
"seq1",
null,
@@ -2370,7 +2370,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
final PartialShardSpec partialShardSpec =
NumberedPartialShardSpec.instance();
final String dataSource = "ds";
final Interval interval = Intervals.of("2017-01-01/2017-02-01");
- final SegmentIdWithShardSpec identifier =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier = allocatePendingSegment(
dataSource,
"seq",
null,
@@ -2385,7 +2385,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals(0, identifier.getShardSpec().getNumCorePartitions());
// simulate one more load using kafka streaming (as if previous segment
was published, note different sequence name)
- final SegmentIdWithShardSpec identifier1 =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier1 = allocatePendingSegment(
dataSource,
"seq2",
identifier.toString(),
@@ -2400,7 +2400,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals(0, identifier1.getShardSpec().getNumCorePartitions());
// simulate one more load using kafka streaming (as if previous segment
was published, note different sequence name)
- final SegmentIdWithShardSpec identifier2 =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier2 = allocatePendingSegment(
dataSource,
"seq3",
identifier1.toString(),
@@ -2431,7 +2431,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new",
ids.get(0));
// one more load on same interval:
- final SegmentIdWithShardSpec identifier3 =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier3 = allocatePendingSegment(
dataSource,
"seq4",
identifier1.toString(),
@@ -2450,7 +2450,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
// and final load, this reproduces an issue that could happen with
multiple streaming appends,
// followed by a reindex, followed by a drop, and more streaming data
coming in for same interval
- final SegmentIdWithShardSpec identifier4 =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier4 = allocatePendingSegment(
dataSource,
"seq5",
identifier1.toString(),
@@ -2484,7 +2484,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
final PartialShardSpec partialShardSpec =
NumberedPartialShardSpec.instance();
final String dataSource = "ds";
final Interval interval = Intervals.of("2017-01-01/2017-02-01");
- final SegmentIdWithShardSpec identifier =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier = allocatePendingSegment(
dataSource,
"seq",
null,
@@ -2513,7 +2513,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
// 1.1) simulate one more append load (as if previous segment was
published, note different sequence name)
- final SegmentIdWithShardSpec identifier1 =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier1 = allocatePendingSegment(
dataSource,
"seq2",
identifier.toString(),
@@ -2542,7 +2542,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
// 1.2) simulate one more append load (as if previous segment was
published, note different sequence name)
- final SegmentIdWithShardSpec identifier2 =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier2 = allocatePendingSegment(
dataSource,
"seq3",
identifier1.toString(),
@@ -2597,7 +2597,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
// unused segment:
// 4) pending segment of version = B, id = 1 <= appending new data, aborted
- final SegmentIdWithShardSpec identifier3 =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier3 = allocatePendingSegment(
dataSource,
"seq4",
identifier2.toString(),
@@ -2632,7 +2632,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals(1, unused.size());
// Simulate one more append load
- final SegmentIdWithShardSpec identifier4 =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier4 = allocatePendingSegment(
dataSource,
"seq5",
identifier1.toString(),
@@ -2678,7 +2678,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
final Interval interval = Intervals.of("2017-01-01/2017-02-01");
final String sequenceName = "seq";
- final SegmentCreateRequest request = new
SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec, null, null);
+ final SegmentCreateRequest request = new
SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec, null);
final SegmentIdWithShardSpec segmentId0 =
coordinator.allocatePendingSegments(
dataSource,
interval,
@@ -2690,7 +2690,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1",
segmentId0.toString());
final SegmentCreateRequest request1 =
- new SegmentCreateRequest(sequenceName, segmentId0.toString(),
segmentId0.getVersion(), partialShardSpec, null, null);
+ new SegmentCreateRequest(sequenceName, segmentId0.toString(),
segmentId0.getVersion(), partialShardSpec, null);
final SegmentIdWithShardSpec segmentId1 =
coordinator.allocatePendingSegments(
dataSource,
interval,
@@ -2702,7 +2702,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_1",
segmentId1.toString());
final SegmentCreateRequest request2 =
- new SegmentCreateRequest(sequenceName, segmentId1.toString(),
segmentId1.getVersion(), partialShardSpec, null, null);
+ new SegmentCreateRequest(sequenceName, segmentId1.toString(),
segmentId1.getVersion(), partialShardSpec, null);
final SegmentIdWithShardSpec segmentId2 =
coordinator.allocatePendingSegments(
dataSource,
interval,
@@ -2714,7 +2714,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2",
segmentId2.toString());
final SegmentCreateRequest request3 =
- new SegmentCreateRequest(sequenceName, segmentId1.toString(),
segmentId1.getVersion(), partialShardSpec, null, null);
+ new SegmentCreateRequest(sequenceName, segmentId1.toString(),
segmentId1.getVersion(), partialShardSpec, null);
final SegmentIdWithShardSpec segmentId3 =
coordinator.allocatePendingSegments(
dataSource,
interval,
@@ -2727,7 +2727,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals(segmentId2, segmentId3);
final SegmentCreateRequest request4 =
- new SegmentCreateRequest("seq1", null, "v1", partialShardSpec, null,
null);
+ new SegmentCreateRequest("seq1", null, "v1", partialShardSpec, null);
final SegmentIdWithShardSpec segmentId4 =
coordinator.allocatePendingSegments(
dataSource,
interval,
@@ -2747,7 +2747,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
final Interval interval = Intervals.of("2017-01-01/2017-02-01");
final String sequenceName = "seq";
- final SegmentCreateRequest request = new
SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec, null, null);
+ final SegmentCreateRequest request = new
SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec, null);
final SegmentIdWithShardSpec segmentId0 =
coordinator.allocatePendingSegments(
dataSource,
interval,
@@ -2759,7 +2759,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1",
segmentId0.toString());
final SegmentCreateRequest request1 =
- new SegmentCreateRequest(sequenceName, segmentId0.toString(),
segmentId0.getVersion(), partialShardSpec, null, null);
+ new SegmentCreateRequest(sequenceName, segmentId0.toString(),
segmentId0.getVersion(), partialShardSpec, null);
final SegmentIdWithShardSpec segmentId1 =
coordinator.allocatePendingSegments(
dataSource,
interval,
@@ -2771,7 +2771,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_1",
segmentId1.toString());
final SegmentCreateRequest request2 =
- new SegmentCreateRequest(sequenceName, segmentId1.toString(),
segmentId1.getVersion(), partialShardSpec, null, null);
+ new SegmentCreateRequest(sequenceName, segmentId1.toString(),
segmentId1.getVersion(), partialShardSpec, null);
final SegmentIdWithShardSpec segmentId2 =
coordinator.allocatePendingSegments(
dataSource,
interval,
@@ -2783,7 +2783,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2",
segmentId2.toString());
final SegmentCreateRequest request3 =
- new SegmentCreateRequest(sequenceName, segmentId1.toString(),
segmentId1.getVersion(), partialShardSpec, null, null);
+ new SegmentCreateRequest(sequenceName, segmentId1.toString(),
segmentId1.getVersion(), partialShardSpec, null);
final SegmentIdWithShardSpec segmentId3 =
coordinator.allocatePendingSegments(
dataSource,
interval,
@@ -2796,7 +2796,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals(segmentId2, segmentId3);
final SegmentCreateRequest request4 =
- new SegmentCreateRequest("seq1", null, "v1", partialShardSpec, null,
null);
+ new SegmentCreateRequest("seq1", null, "v1", partialShardSpec, null);
final SegmentIdWithShardSpec segmentId4 =
coordinator.allocatePendingSegments(
dataSource,
interval,
@@ -2833,7 +2833,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
final PartialShardSpec partialShardSpec =
NumberedPartialShardSpec.instance();
final String dataSource = "ds";
final Interval interval = Intervals.of("2017-01-01/2017-02-01");
- final SegmentIdWithShardSpec identifier =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier = allocatePendingSegment(
dataSource,
"seq",
null,
@@ -2857,7 +2857,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
final DateTime begin = DateTimes.nowUtc();
for (int i = 0; i < 10; i++) {
- final SegmentIdWithShardSpec identifier =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier = allocatePendingSegment(
dataSource,
"seq",
prevSegmentId,
@@ -2873,7 +2873,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
final DateTime secondBegin = DateTimes.nowUtc();
for (int i = 0; i < 5; i++) {
- final SegmentIdWithShardSpec identifier =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier = allocatePendingSegment(
dataSource,
"seq",
prevSegmentId,
@@ -2901,7 +2901,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
String prevSegmentId = null;
for (int i = 0; i < 10; i++) {
- final SegmentIdWithShardSpec identifier =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier = allocatePendingSegment(
dataSource,
"seq",
prevSegmentId,
@@ -2970,7 +2970,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
final String dataSource = "ds";
final Interval interval = Intervals.of("2017-01-01/2017-02-01");
- SegmentIdWithShardSpec id = coordinator.allocatePendingSegment(
+ SegmentIdWithShardSpec id = allocatePendingSegment(
dataSource,
"seq",
null,
@@ -3003,7 +3003,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
new
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)
);
- id = coordinator.allocatePendingSegment(
+ id = allocatePendingSegment(
dataSource,
"seq2",
null,
@@ -3036,7 +3036,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
new
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)
);
- id = coordinator.allocatePendingSegment(
+ id = allocatePendingSegment(
dataSource,
"seq3",
null,
@@ -3084,7 +3084,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
);
}
coordinator.commitSegments(originalSegments, new
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION));
- final SegmentIdWithShardSpec id = coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec id = allocatePendingSegment(
datasource,
"seq",
null,
@@ -3130,7 +3130,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
);
}
coordinator.commitSegments(originalSegments, new
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION));
- final SegmentIdWithShardSpec id = coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec id = allocatePendingSegment(
datasource,
"seq",
null,
@@ -3377,7 +3377,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertTrue(coordinator.commitSegments(tombstones, new
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)).containsAll(tombstones));
// Allocate and commit a data segment by appending to the same interval
- final SegmentIdWithShardSpec identifier =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier = allocatePendingSegment(
TestDataSource.WIKI,
"seq",
tombstoneSegment.getVersion(),
@@ -3432,7 +3432,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertTrue(coordinator.commitSegments(tombstones, new
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)).containsAll(tombstones));
// Allocate and commit a data segment by appending to the same interval
- final SegmentIdWithShardSpec identifier =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec identifier = allocatePendingSegment(
TestDataSource.WIKI,
"seq",
tombstoneSegment.getVersion(),
@@ -3471,7 +3471,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
@Test
public void testSegmentIdShouldNotBeReallocated()
{
- final SegmentIdWithShardSpec idWithNullTaskAllocator =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec idWithNullTaskAllocator =
allocatePendingSegment(
TestDataSource.WIKI,
"seq",
"0",
@@ -3487,7 +3487,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
idWithNullTaskAllocator.getShardSpec()
);
- final SegmentIdWithShardSpec idWithValidTaskAllocator =
coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec idWithValidTaskAllocator =
allocatePendingSegment(
TestDataSource.WIKI,
"seq",
"1",
@@ -3510,7 +3510,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
// Mark all segments as unused
coordinator.markSegmentsAsUnusedWithinInterval(TestDataSource.WIKI,
Intervals.ETERNITY);
- final SegmentIdWithShardSpec theId = coordinator.allocatePendingSegment(
+ final SegmentIdWithShardSpec theId = allocatePendingSegment(
TestDataSource.WIKI,
"seq",
"2",
@@ -3791,6 +3791,30 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
);
}
+ private SegmentIdWithShardSpec allocatePendingSegment(
+ String datasource,
+ String sequenceName,
+ String previousSegmentId,
+ Interval interval,
+ PartialShardSpec partialShardSpec,
+ String maxVersion,
+ boolean skipSegmentLineageCheck,
+ String taskAllocatorId
+ )
+ {
+ return coordinator.allocatePendingSegment(
+ datasource,
+ interval,
+ skipSegmentLineageCheck,
+ new SegmentCreateRequest(
+ sequenceName,
+ previousSegmentId,
+ maxVersion,
+ partialShardSpec,
+ taskAllocatorId
+ )
+ );
+ }
private void insertUsedSegments(Set<DataSegment> segments, Map<String,
String> upgradedFromSegmentIdMap)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]