github-advanced-security[bot] commented on code in PR #17420:
URL: https://github.com/apache/druid/pull/17420#discussion_r1821679094


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -452,6 +463,232 @@
     }
   }
 
+  public void cacheSegmentPublishResults(Task task, Collection<DataSegment> 
segments)
+  {
+    if (!segmentAllocationReduceMetadataIO) {
+      return;
+    }
+    Map<Interval, DataSegment> intervalToMaxSegment = new HashMap<>();
+    for (DataSegment segment : segments) {
+      final Interval interval = segment.getInterval();
+      if (intervalToMaxSegment.get(interval) == null
+          || intervalToMaxSegment.get(interval).getId().getPartitionNum() < 
segment.getId().getPartitionNum()) {
+        intervalToMaxSegment.put(interval, segment);
+      }
+    }
+    for (Map.Entry<Interval, DataSegment> entry : 
intervalToMaxSegment.entrySet()) {
+      final Interval interval = entry.getKey();
+      for (TaskLockPosse posse : 
findLockPossesOverlapsInterval(task.getDataSource(), interval)) {
+        if (posse.containsTask(task)) {
+          if (posse.getTaskLock().getInterval().equals(interval) && 
posse.visibleSegmentState != null) {
+            posse.visibleSegmentState.updateMaxSegment(entry.getValue());
+          }
+        }
+      }
+    }
+  }
+
+  public boolean canAllocateSegmentWithReducedMetadataIO(final LockGranularity 
granularity, final TaskLockType lockType)
+  {
+    return segmentAllocationReduceMetadataIO
+           && granularity == LockGranularity.TIME_CHUNK
+           && (lockType == TaskLockType.EXCLUSIVE || lockType == 
TaskLockType.SHARED);
+  }
+
+  public LockResult allocateSegmentWithReducedMetadataIO(
+      final Task task,
+      final TaskLockType lockType,
+      final DateTime timestamp,
+      final Granularity queryGranularity,
+      final Granularity preferredSegmentGranularity,
+      final String sequenceName,
+      final String previousSegmentId,
+      final boolean skipSegmentLineageCheck,
+      final PartialShardSpec partialShardSpec
+  )
+  {
+    giant.lock();
+
+    try {
+      final Interval rowInterval = 
queryGranularity.bucket(timestamp).withChronology(ISOChronology.getInstanceUTC());
+
+      for (Granularity segmentGranularity : 
Granularity.granularitiesFinerThan(preferredSegmentGranularity)) {
+        final Interval segmentInterval = segmentGranularity.bucket(timestamp)
+                                                           
.withChronology(ISOChronology.getInstanceUTC());
+        if (!segmentInterval.contains(rowInterval)) {
+          break;
+        }
+        final VisibleSegmentState visibleSegmentState = 
determineVisibleSegmentState(task, segmentInterval);
+        if (visibleSegmentState == null) {
+          continue;
+        }
+
+        if (!visibleSegmentState.interval.contains(rowInterval)) {
+          continue;
+        }
+
+        if (visibleSegmentState.committedVersion == null && 
!visibleSegmentState.interval.equals(segmentInterval)) {
+          continue;
+        }
+
+        final TimeChunkLockRequest lockRequest
+            = new TimeChunkLockRequest(lockType, task, 
visibleSegmentState.interval, null);
+
+        final TaskLockPosse lockPosse = createOrFindLockPosse(lockRequest, 
task, true);
+
+        if (lockPosse.getTaskLock().isRevoked()) {
+          log.warn("Lock[%s] is revoked.", lockPosse.getTaskLock());
+          return LockResult.revoked(lockPosse.getTaskLock());
+        }
+
+        if 
(!lockPosse.getTaskLock().getInterval().equals(visibleSegmentState.interval)) {
+          log.warn(
+              "Interval[%s] of existing segments for datasource[%s] does not 
match the lock interval[%s]",
+              visibleSegmentState.interval, task.getDataSource(), 
lockPosse.taskLock.getInterval()
+          );
+          unlock(task, lockPosse.getTaskLock().getInterval());
+          continue;
+        }
+
+        if (lockPosse.visibleSegmentState == null) {
+          lockPosse.visibleSegmentState = visibleSegmentState;
+        } else if (!lockPosse.visibleSegmentState.equals(visibleSegmentState)) 
{
+          throw DruidException.defensive("VisibleSegmentState mismatch for 
lock[%s]", lockPosse.getTaskLock());
+        }
+
+        final String allocatorId = ((PendingSegmentAllocatingTask) 
task).getTaskAllocatorId();
+        SegmentIdWithShardSpec allocatedId = 
((IndexerSQLMetadataStorageCoordinator) metadataStorageCoordinator)
+            .findOrInsertPendingSegmentRecord(
+                task.getDataSource(),
+                visibleSegmentState.interval,
+                visibleSegmentState.committedVersion,
+                
visibleSegmentState.getVersion(lockPosse.taskLock.getVersion()),
+                visibleSegmentState.getMaxId(),
+                visibleSegmentState.numCorePartitions,
+                partialShardSpec,
+                sequenceName,
+                previousSegmentId,
+                skipSegmentLineageCheck,
+                allocatorId
+            );
+
+        if (allocatedId != null) {
+          visibleSegmentState.addPendingSegment(
+              new PendingSegmentRecord(allocatedId, sequenceName, 
previousSegmentId, null, allocatorId)
+          );
+        }
+        log.info("Found or Allocated pendingSegment[%s].", 
allocatedId.asSegmentId());

Review Comment:
   ## Dereferenced variable may be null
   
   Variable [allocatedId](1) may be null at this access as suggested by 
[this](2) null guard.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/8440)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to