kfaraz commented on code in PR #13369:
URL: https://github.com/apache/druid/pull/13369#discussion_r1027675736


##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -777,6 +994,202 @@ private void insertPendingSegmentIntoMetastore(
           .execute();
   }
 
+  private Map<SegmentCreateRequest, SegmentIdWithShardSpec> createNewSegments(
+      Handle handle,
+      String dataSource,
+      Interval interval,
+      boolean skipSegmentLineageCheck,
+      List<SegmentCreateRequest> requests
+  ) throws IOException
+  {
+    if (requests.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    // Get the time chunk and associated data segments for the given interval, 
if any
+    final List<TimelineObjectHolder<String, DataSegment>> existingChunks =
+        getTimelineForIntervalsWithHandle(handle, dataSource, 
Collections.singletonList(interval))
+            .lookup(interval);
+
+    if (existingChunks.size() > 1) {
+      // Not possible to expand more than one chunk with a single segment.
+      log.warn(
+          "Cannot allocate new segments for dataSource[%s], interval[%s]: 
already have [%,d] chunks.",
+          dataSource,
+          interval,
+          existingChunks.size()
+      );
+      return Collections.emptyMap();
+    }
+
+    // Shard spec of any of the requests (as they are all compatible) can be 
used to
+    // identify existing shard specs that share partition space with the 
requested ones.
+    final PartialShardSpec partialShardSpec = 
requests.get(0).getPartialShardSpec();
+
+    // max partitionId of published data segments which share the same 
partition space.
+    SegmentIdWithShardSpec committedMaxId = null;
+
+    @Nullable
+    final String versionOfExistingChunk;
+    if (existingChunks.isEmpty()) {
+      versionOfExistingChunk = null;
+    } else {
+      TimelineObjectHolder<String, DataSegment> existingHolder = 
Iterables.getOnlyElement(existingChunks);
+      versionOfExistingChunk = existingHolder.getVersion();
+
+      // Don't use the stream API for performance.
+      for (DataSegment segment : FluentIterable
+          .from(existingHolder.getObject())
+          .transform(PartitionChunk::getObject)
+          // Here we check only the segments of the shardSpec which shares the 
same partition space with the given
+          // partialShardSpec. Note that OverwriteShardSpec doesn't share the 
partition space with others.
+          // See PartitionIds.
+          .filter(segment -> 
segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
+        if (committedMaxId == null
+            || committedMaxId.getShardSpec().getPartitionNum() < 
segment.getShardSpec().getPartitionNum()) {
+          committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment);
+        }
+      }
+    }
+
+
+    // Fetch the pending segments for this interval to determine max 
partitionId
+    // across all shard specs (published + pending).
+    // A pending segment having a higher partitionId must also be considered
+    // to avoid clashes when inserting the pending segment created here.
+    final Set<SegmentIdWithShardSpec> pendingSegments =
+        getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval);
+    if (committedMaxId != null) {
+      pendingSegments.add(committedMaxId);
+    }
+
+    final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments = 
new HashMap<>();
+    final Map<String, SegmentIdWithShardSpec> sequenceHashToSegment = new 
HashMap<>();
+
+    for (SegmentCreateRequest request : requests) {
+      // Check if the required segment has already been created in this batch
+      final String sequenceHash = getSequenceNameAndPrevIdSha(request, 
interval, skipSegmentLineageCheck);
+
+      final SegmentIdWithShardSpec createdSegment;
+      if (sequenceHashToSegment.containsKey(sequenceHash)) {
+        createdSegment = sequenceHashToSegment.get(sequenceHash);
+      } else {
+        createdSegment = createNewSegment(
+            request,
+            dataSource,
+            interval,
+            versionOfExistingChunk,
+            committedMaxId,
+            pendingSegments
+        );
+
+        // Add to pendingSegments to consider for partitionId
+        if (createdSegment != null) {
+          pendingSegments.add(createdSegment);
+          sequenceHashToSegment.put(sequenceHash, createdSegment);
+          log.info("Created new segment [%s]", createdSegment);
+        }
+      }
+
+      if (createdSegment != null) {
+        createdSegments.put(request, createdSegment);
+      }
+    }
+
+    log.info("Created [%d] new segments for [%d] allocate requests.", 
sequenceHashToSegment.size(), requests.size());
+    return createdSegments;
+  }
+
+  private SegmentIdWithShardSpec createNewSegment(

Review Comment:
   I suppose we could pull out the computation of the `overallMaxId` as that 
remains the same across the requests in a batch (except including the id of the 
ids allocated so far). The rest of the stuff in this method will be complicated 
to pull out because each request can have a different version, depending on the 
underlying lock that it has created/acquired. Plus, it is not computationally 
heavy and will not give any perf benefit on commoning out either.
   
   To deduplicate code, the older `createNewSegment` can call this method 
instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to