kfaraz commented on code in PR #16144:
URL: https://github.com/apache/druid/pull/16144#discussion_r1567653351
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java:
##########
@@ -134,34 +151,55 @@ public SegmentPublishResult perform(Task task,
TaskActionToolbox toolbox)
}
/**
- * Tries to upgrade any pending segments that overlap with the committed
segments.
+ * Registers upgraded pending segments on the active supervisor, if any
*/
- private void tryUpgradeOverlappingPendingSegments(Task task,
TaskActionToolbox toolbox)
+ private void registerUpgradedPendingSegmentsOnSupervisor(Task task,
TaskActionToolbox toolbox)
{
final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
final Optional<String> activeSupervisorIdWithAppendLock =
supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());
+
if (!activeSupervisorIdWithAppendLock.isPresent()) {
return;
}
- final Set<String> activeRealtimeSequencePrefixes
- =
supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
- Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec>
upgradedPendingSegments =
- toolbox.getIndexerMetadataStorageCoordinator()
- .upgradePendingSegmentsOverlappingWith(segments,
activeRealtimeSequencePrefixes);
- log.info(
- "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
- upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
- );
+ final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+ .getTaskLockbox()
+ .getAllReplaceLocksForDatasource(task.getDataSource())
+ .stream()
+ .filter(lock -> task.getId().equals(lock.getSupervisorTaskId()))
+ .collect(Collectors.toSet());
+
- upgradedPendingSegments.forEach(
- (oldId, newId) -> toolbox.getSupervisorManager()
-
.registerNewVersionOfPendingSegmentOnSupervisor(
- activeSupervisorIdWithAppendLock.get(),
- oldId,
- newId
- )
+ Set<PendingSegmentRecord> pendingSegments = new HashSet<>();
+ for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
+ pendingSegments.addAll(
+ toolbox.getIndexerMetadataStorageCoordinator()
+ .getPendingSegments(task.getDataSource(),
replaceLock.getInterval())
+ );
+ }
+ Map<String, SegmentIdWithShardSpec> idToPendingSegment = new HashMap<>();
+ pendingSegments.forEach(pendingSegment -> idToPendingSegment.put(
+ pendingSegment.getId().asSegmentId().toString(),
+ pendingSegment.getId()
+ ));
+ Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> segmentToParent = new
HashMap<>();
+ pendingSegments.forEach(pendingSegment -> {
+ if (pendingSegment.getUpgradedFromSegmentId() != null
+ &&
!pendingSegment.getUpgradedFromSegmentId().equals(pendingSegment.getId().asSegmentId().toString()))
{
Review Comment:
Can the `upgradedFromSegmentId` ever be equal to the `id` itself?
A normal/root pending segment (i.e. one created by allocation and not
upgrade) would have `upgraded_from_segment_id` as `null`, right?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java:
##########
@@ -45,7 +46,7 @@
import java.util.Set;
@JsonTypeName(MSQWorkerTask.TYPE)
-public class MSQWorkerTask extends AbstractTask
+public class MSQWorkerTask extends AbstractTask implements
PendingSegmentAllocatingTask
Review Comment:
This class need not implement `PendingSegmentAllocatingTask` as it never
actually does any allocation. The allocation is always done by the controller
task.
this can be addressed in a follow up PR.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java:
##########
@@ -134,34 +151,55 @@ public SegmentPublishResult perform(Task task,
TaskActionToolbox toolbox)
}
/**
- * Tries to upgrade any pending segments that overlap with the committed
segments.
+ * Registers upgraded pending segments on the active supervisor, if any
*/
- private void tryUpgradeOverlappingPendingSegments(Task task,
TaskActionToolbox toolbox)
+ private void registerUpgradedPendingSegmentsOnSupervisor(Task task,
TaskActionToolbox toolbox)
{
final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
final Optional<String> activeSupervisorIdWithAppendLock =
supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());
+
if (!activeSupervisorIdWithAppendLock.isPresent()) {
return;
}
- final Set<String> activeRealtimeSequencePrefixes
- =
supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
- Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec>
upgradedPendingSegments =
- toolbox.getIndexerMetadataStorageCoordinator()
- .upgradePendingSegmentsOverlappingWith(segments,
activeRealtimeSequencePrefixes);
- log.info(
- "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
- upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
- );
+ final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
Review Comment:
Some comments here would be helpful.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -340,7 +330,7 @@ public boolean
registerNewVersionOfPendingSegmentOnSupervisor(
return true;
}
catch (Exception e) {
- log.error(e, "PendingSegment[%s] mapping update request to version[%s]
on Supervisor[%s] failed",
+ log.error(e, "PendingSegmentRecord[%s] mapping update request to
version[%s] on Supervisor[%s] failed",
Review Comment:
This change is not needed.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java:
##########
@@ -39,7 +39,7 @@
/**
*/
-public class NoopTask extends AbstractTask
+public class NoopTask extends AbstractTask implements
PendingSegmentAllocatingTask
Review Comment:
Does `NoopTask` need to implement the new interface for the purpose of tests?
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java:
##########
@@ -134,34 +151,55 @@ public SegmentPublishResult perform(Task task,
TaskActionToolbox toolbox)
}
/**
- * Tries to upgrade any pending segments that overlap with the committed
segments.
+ * Registers upgraded pending segments on the active supervisor, if any
*/
- private void tryUpgradeOverlappingPendingSegments(Task task,
TaskActionToolbox toolbox)
+ private void registerUpgradedPendingSegmentsOnSupervisor(Task task,
TaskActionToolbox toolbox)
{
final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
final Optional<String> activeSupervisorIdWithAppendLock =
supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());
+
if (!activeSupervisorIdWithAppendLock.isPresent()) {
return;
}
- final Set<String> activeRealtimeSequencePrefixes
- =
supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
- Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec>
upgradedPendingSegments =
- toolbox.getIndexerMetadataStorageCoordinator()
- .upgradePendingSegmentsOverlappingWith(segments,
activeRealtimeSequencePrefixes);
- log.info(
- "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
- upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
- );
+ final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+ .getTaskLockbox()
+ .getAllReplaceLocksForDatasource(task.getDataSource())
+ .stream()
+ .filter(lock -> task.getId().equals(lock.getSupervisorTaskId()))
+ .collect(Collectors.toSet());
+
- upgradedPendingSegments.forEach(
- (oldId, newId) -> toolbox.getSupervisorManager()
-
.registerNewVersionOfPendingSegmentOnSupervisor(
- activeSupervisorIdWithAppendLock.get(),
- oldId,
- newId
- )
+ Set<PendingSegmentRecord> pendingSegments = new HashSet<>();
+ for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
+ pendingSegments.addAll(
+ toolbox.getIndexerMetadataStorageCoordinator()
+ .getPendingSegments(task.getDataSource(),
replaceLock.getInterval())
+ );
+ }
+ Map<String, SegmentIdWithShardSpec> idToPendingSegment = new HashMap<>();
+ pendingSegments.forEach(pendingSegment -> idToPendingSegment.put(
+ pendingSegment.getId().asSegmentId().toString(),
+ pendingSegment.getId()
+ ));
+ Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> segmentToParent = new
HashMap<>();
+ pendingSegments.forEach(pendingSegment -> {
+ if (pendingSegment.getUpgradedFromSegmentId() != null
Review Comment:
Should we only look at pending segments that were upgraded by this task
rather than all upgraded pending segments?
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1073,7 +1057,9 @@ private SegmentIdWithShardSpec allocatePendingSegment(
);
// always insert empty previous sequence id
- insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource,
interval, "", sequenceName, sequenceNamePrevIdSha1);
+ insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource,
interval, "", sequenceName, sequenceNamePrevIdSha1,
+ taskAllocatorId
Review Comment:
Formatting is off.
##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -476,4 +478,19 @@ SegmentPublishResult commitMetadataOnly(
* @return number of deleted entries from the metadata store
*/
int deleteUpgradeSegmentsForTask(String taskId);
+
+ /**
+ * Delete pending segment for a give task group after all the tasks
belonging to it have completed.
+ * @param taskAllocatorId task id / task group / replica group for an
appending task
+ * @return number of pending segments deleted from the metadata store
+ */
+ int deletePendingSegmentsForTaskGroup(String taskAllocatorId);
Review Comment:
Method needs to be renamed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]