kfaraz commented on code in PR #15039:
URL: https://github.com/apache/druid/pull/15039#discussion_r1354159462
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java:
##########
@@ -111,24 +114,42 @@ public SegmentPublishResult perform(Task task,
TaskActionToolbox toolbox)
throw new RuntimeException(e);
}
- // Emit metrics
- final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
- IndexTaskUtils.setTaskDimensions(metricBuilder, task);
-
- if (retVal.isSuccess()) {
- toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success",
1));
+ IndexTaskUtils.emitSegmentPublishMetrics(publishResult, task, toolbox);
- for (DataSegment segment : retVal.getSegments()) {
- final String partitionType = segment.getShardSpec() == null ? null :
segment.getShardSpec().getType();
- metricBuilder.setDimension(DruidMetrics.PARTITIONING_TYPE,
partitionType);
- metricBuilder.setDimension(DruidMetrics.INTERVAL,
segment.getInterval().toString());
-
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes",
segment.getSize()));
+ final Set<String> activeSupervisorIds = new HashSet<>();
+ if (toolbox.getSupervisorManager() != null) {
+ activeSupervisorIds.addAll(
+
toolbox.getSupervisorManager().getSeekableStreamSupervisorIdsForDatasource(task.getDataSource())
+ );
+ }
+ if (publishResult.isSuccess() && !activeSupervisorIds.isEmpty()) {
+ // If upgrade of pending segments fails, the segments will still get
upgraded
+ // when the corresponding APPEND task commits the segments.
+ // Thus, the upgrade of pending segments should not be done in the same
+ // transaction as the commit of replace segments and failure to upgrade
+ // pending segments should not affect success of replace commit.
+ try {
+ Set<SegmentIdWithShardSpec> upgradedPendingSegments =
+
toolbox.getIndexerMetadataStorageCoordinator().upgradePendingSegments(segments);
+ log.info(
+ "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
+ upgradedPendingSegments.size(), task.getId(),
upgradedPendingSegments
+ );
+
+ for (String supervisorId : activeSupervisorIds) {
+ for (SegmentIdWithShardSpec pendingSegment :
upgradedPendingSegments) {
+
toolbox.getSupervisorManager().updatePendingSegmentMapping(supervisorId,
pendingSegment);
+ }
+ }
+
+ // These upgraded pending segments should be forwarded to the
SupervisorManager
Review Comment:
Yeah, this was a TODO for when both the changes were combined. Let me remove
this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]