AmatyaAvadhanula commented on code in PR #13369: URL: https://github.com/apache/druid/pull/13369#discussion_r1035547408
########## server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java: ########## @@ -777,6 +994,202 @@ private void insertPendingSegmentIntoMetastore( .execute(); } + private Map<SegmentCreateRequest, SegmentIdWithShardSpec> createNewSegments( + Handle handle, + String dataSource, + Interval interval, + boolean skipSegmentLineageCheck, + List<SegmentCreateRequest> requests + ) throws IOException + { + if (requests.isEmpty()) { + return Collections.emptyMap(); + } + + // Get the time chunk and associated data segments for the given interval, if any + final List<TimelineObjectHolder<String, DataSegment>> existingChunks = + getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval)) + .lookup(interval); + + if (existingChunks.size() > 1) { + // Not possible to expand more than one chunk with a single segment. + log.warn( + "Cannot allocate new segments for dataSource[%s], interval[%s]: already have [%,d] chunks.", + dataSource, + interval, + existingChunks.size() + ); + return Collections.emptyMap(); + } + + // Shard spec of any of the requests (as they are all compatible) can be used to + // identify existing shard specs that share partition space with the requested ones. + final PartialShardSpec partialShardSpec = requests.get(0).getPartialShardSpec(); + + // max partitionId of published data segments which share the same partition space. + SegmentIdWithShardSpec committedMaxId = null; + + @Nullable + final String versionOfExistingChunk; + if (existingChunks.isEmpty()) { + versionOfExistingChunk = null; + } else { + TimelineObjectHolder<String, DataSegment> existingHolder = Iterables.getOnlyElement(existingChunks); + versionOfExistingChunk = existingHolder.getVersion(); + + // Don't use the stream API for performance. + for (DataSegment segment : FluentIterable + .from(existingHolder.getObject()) + .transform(PartitionChunk::getObject) + // Here we check only the segments of the shardSpec which shares the same partition space with the given + // partialShardSpec. Note that OverwriteShardSpec doesn't share the partition space with others. + // See PartitionIds. + .filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) { + if (committedMaxId == null + || committedMaxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) { + committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment); + } + } + } + + + // Fetch the pending segments for this interval to determine max partitionId + // across all shard specs (published + pending). + // A pending segment having a higher partitionId must also be considered + // to avoid clashes when inserting the pending segment created here. + final Set<SegmentIdWithShardSpec> pendingSegments = + getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval); + if (committedMaxId != null) { + pendingSegments.add(committedMaxId); + } + + final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments = new HashMap<>(); + final Map<String, SegmentIdWithShardSpec> sequenceHashToSegment = new HashMap<>(); + + for (SegmentCreateRequest request : requests) { + // Check if the required segment has already been created in this batch + final String sequenceHash = getSequenceNameAndPrevIdSha(request, interval, skipSegmentLineageCheck); + + final SegmentIdWithShardSpec createdSegment; + if (sequenceHashToSegment.containsKey(sequenceHash)) { + createdSegment = sequenceHashToSegment.get(sequenceHash); + } else { + createdSegment = createNewSegment( + request, + dataSource, + interval, + versionOfExistingChunk, + committedMaxId, + pendingSegments + ); + + // Add to pendingSegments to consider for partitionId + if (createdSegment != null) { + pendingSegments.add(createdSegment); + sequenceHashToSegment.put(sequenceHash, createdSegment); + log.info("Created new segment [%s]", createdSegment); + } + } + + if (createdSegment != null) { + createdSegments.put(request, createdSegment); + } + } + + log.info("Created [%d] new segments for [%d] allocate requests.", sequenceHashToSegment.size(), requests.size()); + return createdSegments; + } + + private SegmentIdWithShardSpec createNewSegment( + SegmentCreateRequest request, + String dataSource, + Interval interval, + String versionOfExistingChunk, + SegmentIdWithShardSpec committedMaxId, + Set<SegmentIdWithShardSpec> pendingSegments + ) + { + final PartialShardSpec partialShardSpec = request.getPartialShardSpec(); + final String existingVersion = request.getVersion(); + + // If there is an existing chunk, find the max id with the same version as the existing chunk. + // There may still be a pending segment with a higher version (but no corresponding used segments) + // which may generate a clash with an existing segment once the new id is generated + final SegmentIdWithShardSpec overallMaxId = Review Comment: The overallMaxId is computed assuming that committedMaxId has already added to the set of pendingSegments. I think it's better to not make that assumption and add committedMaxId explicitly in this method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org