github-advanced-security[bot] commented on code in PR #17420:
URL: https://github.com/apache/druid/pull/17420#discussion_r1821679094
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -452,6 +463,232 @@
}
}
+ public void cacheSegmentPublishResults(Task task, Collection<DataSegment>
segments)
+ {
+ if (!segmentAllocationReduceMetadataIO) {
+ return;
+ }
+ Map<Interval, DataSegment> intervalToMaxSegment = new HashMap<>();
+ for (DataSegment segment : segments) {
+ final Interval interval = segment.getInterval();
+ if (intervalToMaxSegment.get(interval) == null
+ || intervalToMaxSegment.get(interval).getId().getPartitionNum() <
segment.getId().getPartitionNum()) {
+ intervalToMaxSegment.put(interval, segment);
+ }
+ }
+ for (Map.Entry<Interval, DataSegment> entry :
intervalToMaxSegment.entrySet()) {
+ final Interval interval = entry.getKey();
+ for (TaskLockPosse posse :
findLockPossesOverlapsInterval(task.getDataSource(), interval)) {
+ if (posse.containsTask(task)) {
+ if (posse.getTaskLock().getInterval().equals(interval) &&
posse.visibleSegmentState != null) {
+ posse.visibleSegmentState.updateMaxSegment(entry.getValue());
+ }
+ }
+ }
+ }
+ }
+
+ public boolean canAllocateSegmentWithReducedMetadataIO(final LockGranularity
granularity, final TaskLockType lockType)
+ {
+ return segmentAllocationReduceMetadataIO
+ && granularity == LockGranularity.TIME_CHUNK
+ && (lockType == TaskLockType.EXCLUSIVE || lockType ==
TaskLockType.SHARED);
+ }
+
+ public LockResult allocateSegmentWithReducedMetadataIO(
+ final Task task,
+ final TaskLockType lockType,
+ final DateTime timestamp,
+ final Granularity queryGranularity,
+ final Granularity preferredSegmentGranularity,
+ final String sequenceName,
+ final String previousSegmentId,
+ final boolean skipSegmentLineageCheck,
+ final PartialShardSpec partialShardSpec
+ )
+ {
+ giant.lock();
+
+ try {
+ final Interval rowInterval =
queryGranularity.bucket(timestamp).withChronology(ISOChronology.getInstanceUTC());
+
+ for (Granularity segmentGranularity :
Granularity.granularitiesFinerThan(preferredSegmentGranularity)) {
+ final Interval segmentInterval = segmentGranularity.bucket(timestamp)
+
.withChronology(ISOChronology.getInstanceUTC());
+ if (!segmentInterval.contains(rowInterval)) {
+ break;
+ }
+ final VisibleSegmentState visibleSegmentState =
determineVisibleSegmentState(task, segmentInterval);
+ if (visibleSegmentState == null) {
+ continue;
+ }
+
+ if (!visibleSegmentState.interval.contains(rowInterval)) {
+ continue;
+ }
+
+ if (visibleSegmentState.committedVersion == null &&
!visibleSegmentState.interval.equals(segmentInterval)) {
+ continue;
+ }
+
+ final TimeChunkLockRequest lockRequest
+ = new TimeChunkLockRequest(lockType, task,
visibleSegmentState.interval, null);
+
+ final TaskLockPosse lockPosse = createOrFindLockPosse(lockRequest,
task, true);
+
+ if (lockPosse.getTaskLock().isRevoked()) {
+ log.warn("Lock[%s] is revoked.", lockPosse.getTaskLock());
+ return LockResult.revoked(lockPosse.getTaskLock());
+ }
+
+ if
(!lockPosse.getTaskLock().getInterval().equals(visibleSegmentState.interval)) {
+ log.warn(
+ "Interval[%s] of existing segments for datasource[%s] does not
match the lock interval[%s]",
+ visibleSegmentState.interval, task.getDataSource(),
lockPosse.taskLock.getInterval()
+ );
+ unlock(task, lockPosse.getTaskLock().getInterval());
+ continue;
+ }
+
+ if (lockPosse.visibleSegmentState == null) {
+ lockPosse.visibleSegmentState = visibleSegmentState;
+ } else if (!lockPosse.visibleSegmentState.equals(visibleSegmentState))
{
+ throw DruidException.defensive("VisibleSegmentState mismatch for
lock[%s]", lockPosse.getTaskLock());
+ }
+
+ final String allocatorId = ((PendingSegmentAllocatingTask)
task).getTaskAllocatorId();
+ SegmentIdWithShardSpec allocatedId =
((IndexerSQLMetadataStorageCoordinator) metadataStorageCoordinator)
+ .findOrInsertPendingSegmentRecord(
+ task.getDataSource(),
+ visibleSegmentState.interval,
+ visibleSegmentState.committedVersion,
+
visibleSegmentState.getVersion(lockPosse.taskLock.getVersion()),
+ visibleSegmentState.getMaxId(),
+ visibleSegmentState.numCorePartitions,
+ partialShardSpec,
+ sequenceName,
+ previousSegmentId,
+ skipSegmentLineageCheck,
+ allocatorId
+ );
+
+ if (allocatedId != null) {
+ visibleSegmentState.addPendingSegment(
+ new PendingSegmentRecord(allocatedId, sequenceName,
previousSegmentId, null, allocatorId)
+ );
+ }
+ log.info("Found or Allocated pendingSegment[%s].",
allocatedId.asSegmentId());
Review Comment:
## Dereferenced variable may be null
Variable [allocatedId](1) may be null at this access as suggested by
[this](2) null guard.
[Show more
details](https://github.com/apache/druid/security/code-scanning/8440)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]