kfaraz commented on code in PR #13369:
URL: https://github.com/apache/druid/pull/13369#discussion_r1028109019


##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -777,6 +994,202 @@ private void insertPendingSegmentIntoMetastore(
           .execute();
   }
 
+  private Map<SegmentCreateRequest, SegmentIdWithShardSpec> createNewSegments(
+      Handle handle,
+      String dataSource,
+      Interval interval,
+      boolean skipSegmentLineageCheck,
+      List<SegmentCreateRequest> requests
+  ) throws IOException
+  {
+    if (requests.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    // Get the time chunk and associated data segments for the given interval, 
if any
+    final List<TimelineObjectHolder<String, DataSegment>> existingChunks =
+        getTimelineForIntervalsWithHandle(handle, dataSource, 
Collections.singletonList(interval))
+            .lookup(interval);
+
+    if (existingChunks.size() > 1) {
+      // Not possible to expand more than one chunk with a single segment.
+      log.warn(
+          "Cannot allocate new segments for dataSource[%s], interval[%s]: 
already have [%,d] chunks.",
+          dataSource,
+          interval,
+          existingChunks.size()
+      );
+      return Collections.emptyMap();
+    }
+
+    // Shard spec of any of the requests (as they are all compatible) can be 
used to
+    // identify existing shard specs that share partition space with the 
requested ones.
+    final PartialShardSpec partialShardSpec = 
requests.get(0).getPartialShardSpec();
+
+    // max partitionId of published data segments which share the same 
partition space.
+    SegmentIdWithShardSpec committedMaxId = null;
+
+    @Nullable
+    final String versionOfExistingChunk;
+    if (existingChunks.isEmpty()) {
+      versionOfExistingChunk = null;
+    } else {
+      TimelineObjectHolder<String, DataSegment> existingHolder = 
Iterables.getOnlyElement(existingChunks);
+      versionOfExistingChunk = existingHolder.getVersion();
+
+      // Don't use the stream API for performance.
+      for (DataSegment segment : FluentIterable
+          .from(existingHolder.getObject())
+          .transform(PartitionChunk::getObject)
+          // Here we check only the segments of the shardSpec which shares the 
same partition space with the given
+          // partialShardSpec. Note that OverwriteShardSpec doesn't share the 
partition space with others.
+          // See PartitionIds.
+          .filter(segment -> 
segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
+        if (committedMaxId == null
+            || committedMaxId.getShardSpec().getPartitionNum() < 
segment.getShardSpec().getPartitionNum()) {
+          committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment);
+        }
+      }
+    }
+
+
+    // Fetch the pending segments for this interval to determine max 
partitionId
+    // across all shard specs (published + pending).
+    // A pending segment having a higher partitionId must also be considered
+    // to avoid clashes when inserting the pending segment created here.
+    final Set<SegmentIdWithShardSpec> pendingSegments =
+        getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval);
+    if (committedMaxId != null) {
+      pendingSegments.add(committedMaxId);
+    }
+
+    final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments = 
new HashMap<>();
+    final Map<String, SegmentIdWithShardSpec> sequenceHashToSegment = new 
HashMap<>();
+
+    for (SegmentCreateRequest request : requests) {
+      // Check if the required segment has already been created in this batch
+      final String sequenceHash = getSequenceNameAndPrevIdSha(request, 
interval, skipSegmentLineageCheck);
+
+      final SegmentIdWithShardSpec createdSegment;
+      if (sequenceHashToSegment.containsKey(sequenceHash)) {
+        createdSegment = sequenceHashToSegment.get(sequenceHash);
+      } else {
+        createdSegment = createNewSegment(
+            request,
+            dataSource,
+            interval,
+            versionOfExistingChunk,
+            committedMaxId,
+            pendingSegments
+        );
+
+        // Add to pendingSegments to consider for partitionId
+        if (createdSegment != null) {
+          pendingSegments.add(createdSegment);
+          sequenceHashToSegment.put(sequenceHash, createdSegment);
+          log.info("Created new segment [%s]", createdSegment);
+        }
+      }
+
+      if (createdSegment != null) {
+        createdSegments.put(request, createdSegment);
+      }
+    }
+
+    log.info("Created [%d] new segments for [%d] allocate requests.", 
sequenceHashToSegment.size(), requests.size());
+    return createdSegments;
+  }
+
+  private SegmentIdWithShardSpec createNewSegment(

Review Comment:
   With time chunk locking, yes. With segment locking, they may be in a batch 
but still have different versions. This is just because of the way a segment 
lock is acquired (see `TaskLockbox.tryLock()` and 
`LockRequestForNewSegment.getVersion()`).  The datasource, groupId and interval 
are always the same in both the cases. (see `AllocateRequestKey` in 
`SegmentAllocationQueue`).
   
   The new code is written like this in order to minimize risk by retaining the 
same behaviour as the old flow except club the metadata calls together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to