AmatyaAvadhanula commented on code in PR #13369:
URL: https://github.com/apache/druid/pull/13369#discussion_r1028077554


##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -777,6 +994,202 @@ private void insertPendingSegmentIntoMetastore(
           .execute();
   }
 
+  private Map<SegmentCreateRequest, SegmentIdWithShardSpec> createNewSegments(
+      Handle handle,
+      String dataSource,
+      Interval interval,
+      boolean skipSegmentLineageCheck,
+      List<SegmentCreateRequest> requests
+  ) throws IOException
+  {
+    if (requests.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    // Get the time chunk and associated data segments for the given interval, 
if any
+    final List<TimelineObjectHolder<String, DataSegment>> existingChunks =
+        getTimelineForIntervalsWithHandle(handle, dataSource, 
Collections.singletonList(interval))
+            .lookup(interval);
+
+    if (existingChunks.size() > 1) {
+      // Not possible to expand more than one chunk with a single segment.
+      log.warn(
+          "Cannot allocate new segments for dataSource[%s], interval[%s]: 
already have [%,d] chunks.",
+          dataSource,
+          interval,
+          existingChunks.size()
+      );
+      return Collections.emptyMap();
+    }
+
+    // Shard spec of any of the requests (as they are all compatible) can be 
used to
+    // identify existing shard specs that share partition space with the 
requested ones.
+    final PartialShardSpec partialShardSpec = 
requests.get(0).getPartialShardSpec();
+
+    // max partitionId of published data segments which share the same 
partition space.
+    SegmentIdWithShardSpec committedMaxId = null;
+
+    @Nullable
+    final String versionOfExistingChunk;
+    if (existingChunks.isEmpty()) {
+      versionOfExistingChunk = null;
+    } else {
+      TimelineObjectHolder<String, DataSegment> existingHolder = 
Iterables.getOnlyElement(existingChunks);
+      versionOfExistingChunk = existingHolder.getVersion();
+
+      // Don't use the stream API for performance.
+      for (DataSegment segment : FluentIterable
+          .from(existingHolder.getObject())
+          .transform(PartitionChunk::getObject)
+          // Here we check only the segments of the shardSpec which shares the 
same partition space with the given
+          // partialShardSpec. Note that OverwriteShardSpec doesn't share the 
partition space with others.
+          // See PartitionIds.
+          .filter(segment -> 
segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
+        if (committedMaxId == null
+            || committedMaxId.getShardSpec().getPartitionNum() < 
segment.getShardSpec().getPartitionNum()) {
+          committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment);
+        }
+      }
+    }
+
+
+    // Fetch the pending segments for this interval to determine max 
partitionId
+    // across all shard specs (published + pending).
+    // A pending segment having a higher partitionId must also be considered
+    // to avoid clashes when inserting the pending segment created here.
+    final Set<SegmentIdWithShardSpec> pendingSegments =
+        getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval);
+    if (committedMaxId != null) {
+      pendingSegments.add(committedMaxId);
+    }
+
+    final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments = 
new HashMap<>();
+    final Map<String, SegmentIdWithShardSpec> sequenceHashToSegment = new 
HashMap<>();
+
+    for (SegmentCreateRequest request : requests) {
+      // Check if the required segment has already been created in this batch
+      final String sequenceHash = getSequenceNameAndPrevIdSha(request, 
interval, skipSegmentLineageCheck);
+
+      final SegmentIdWithShardSpec createdSegment;
+      if (sequenceHashToSegment.containsKey(sequenceHash)) {
+        createdSegment = sequenceHashToSegment.get(sequenceHash);
+      } else {
+        createdSegment = createNewSegment(
+            request,
+            dataSource,
+            interval,
+            versionOfExistingChunk,
+            committedMaxId,
+            pendingSegments
+        );
+
+        // Add to pendingSegments to consider for partitionId
+        if (createdSegment != null) {
+          pendingSegments.add(createdSegment);
+          sequenceHashToSegment.put(sequenceHash, createdSegment);
+          log.info("Created new segment [%s]", createdSegment);
+        }
+      }
+
+      if (createdSegment != null) {
+        createdSegments.put(request, createdSegment);
+      }
+    }
+
+    log.info("Created [%d] new segments for [%d] allocate requests.", 
sequenceHashToSegment.size(), requests.size());
+    return createdSegments;
+  }
+
+  private SegmentIdWithShardSpec createNewSegment(

Review Comment:
   The idea behind this suggestion was that the requests being batched to 
createXNewSegments would all have the same version and be for the same 
interval, datasource as well. 
   It would then simply allocate X new segments with shard specs ranging from 
overallMaxId + 1 ... overallMaxId + X with the rest of the logic remaining the 
same



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to