AmatyaAvadhanula commented on code in PR #13369:
URL: https://github.com/apache/druid/pull/13369#discussion_r1027651278
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -777,6 +994,202 @@ private void insertPendingSegmentIntoMetastore(
.execute();
}
+ private Map<SegmentCreateRequest, SegmentIdWithShardSpec> createNewSegments(
+ Handle handle,
+ String dataSource,
+ Interval interval,
+ boolean skipSegmentLineageCheck,
+ List<SegmentCreateRequest> requests
+ ) throws IOException
+ {
+ if (requests.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ // Get the time chunk and associated data segments for the given interval,
if any
+ final List<TimelineObjectHolder<String, DataSegment>> existingChunks =
+ getTimelineForIntervalsWithHandle(handle, dataSource,
Collections.singletonList(interval))
+ .lookup(interval);
+
+ if (existingChunks.size() > 1) {
+ // Not possible to expand more than one chunk with a single segment.
+ log.warn(
+ "Cannot allocate new segments for dataSource[%s], interval[%s]:
already have [%,d] chunks.",
+ dataSource,
+ interval,
+ existingChunks.size()
+ );
+ return Collections.emptyMap();
+ }
+
+ // Shard spec of any of the requests (as they are all compatible) can be
used to
+ // identify existing shard specs that share partition space with the
requested ones.
+ final PartialShardSpec partialShardSpec =
requests.get(0).getPartialShardSpec();
+
+ // max partitionId of published data segments which share the same
partition space.
+ SegmentIdWithShardSpec committedMaxId = null;
+
+ @Nullable
+ final String versionOfExistingChunk;
+ if (existingChunks.isEmpty()) {
+ versionOfExistingChunk = null;
+ } else {
+ TimelineObjectHolder<String, DataSegment> existingHolder =
Iterables.getOnlyElement(existingChunks);
+ versionOfExistingChunk = existingHolder.getVersion();
+
+ // Don't use the stream API for performance.
+ for (DataSegment segment : FluentIterable
+ .from(existingHolder.getObject())
+ .transform(PartitionChunk::getObject)
+ // Here we check only the segments of the shardSpec which shares the
same partition space with the given
+ // partialShardSpec. Note that OverwriteShardSpec doesn't share the
partition space with others.
+ // See PartitionIds.
+ .filter(segment ->
segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
+ if (committedMaxId == null
+ || committedMaxId.getShardSpec().getPartitionNum() <
segment.getShardSpec().getPartitionNum()) {
+ committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment);
+ }
+ }
+ }
+
+
+ // Fetch the pending segments for this interval to determine max
partitionId
+ // across all shard specs (published + pending).
+ // A pending segment having a higher partitionId must also be considered
+ // to avoid clashes when inserting the pending segment created here.
+ final Set<SegmentIdWithShardSpec> pendingSegments =
+ getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval);
+ if (committedMaxId != null) {
+ pendingSegments.add(committedMaxId);
+ }
+
+ final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments =
new HashMap<>();
+ final Map<String, SegmentIdWithShardSpec> sequenceHashToSegment = new
HashMap<>();
+
+ for (SegmentCreateRequest request : requests) {
+ // Check if the required segment has already been created in this batch
+ final String sequenceHash = getSequenceNameAndPrevIdSha(request,
interval, skipSegmentLineageCheck);
+
+ final SegmentIdWithShardSpec createdSegment;
+ if (sequenceHashToSegment.containsKey(sequenceHash)) {
+ createdSegment = sequenceHashToSegment.get(sequenceHash);
+ } else {
+ createdSegment = createNewSegment(
+ request,
+ dataSource,
+ interval,
+ versionOfExistingChunk,
+ committedMaxId,
+ pendingSegments
+ );
+
+ // Add to pendingSegments to consider for partitionId
+ if (createdSegment != null) {
+ pendingSegments.add(createdSegment);
+ sequenceHashToSegment.put(sequenceHash, createdSegment);
+ log.info("Created new segment [%s]", createdSegment);
+ }
+ }
+
+ if (createdSegment != null) {
+ createdSegments.put(request, createdSegment);
+ }
+ }
+
+ log.info("Created [%d] new segments for [%d] allocate requests.",
sequenceHashToSegment.size(), requests.size());
+ return createdSegments;
+ }
+
+ private SegmentIdWithShardSpec createNewSegment(
Review Comment:
Would it be possible to utilize the older method `createNewSegment` and pass
the number of segments to be created as an argument?
If all shardSpecs for common properties can be created with increasing
partitionIds in the same call, it may make things simpler and help with
performance as well
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]