github-advanced-security[bot] commented on code in PR #15141:
URL: https://github.com/apache/druid/pull/15141#discussion_r1446853736


##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java:
##########
@@ -596,91 +597,104 @@
                                   ? null
                                   : ((AppenderatorDriverMetadata) 
metadata).getCallerMetadata();
     return executor.submit(
-        () -> {
-          try {
-            final ImmutableSet<DataSegment> ourSegments = 
ImmutableSet.copyOf(pushedAndTombstones);
-            final SegmentPublishResult publishResult = 
publisher.publishSegments(
-                segmentsToBeOverwritten,
-                ourSegments,
-                outputSegmentsAnnotateFunction,
-                callerMetadata
-            );
-
-            if (publishResult.isSuccess()) {
-              log.info(
-                  "Published [%s] segments with commit metadata [%s]",
-                  segmentsAndCommitMetadata.getSegments().size(),
-                  callerMetadata
-              );
-              log.infoSegments(segmentsAndCommitMetadata.getSegments(), 
"Published segments");
-            } else {
-              // Publishing didn't affirmatively succeed. However, segments 
with our identifiers may still be active
-              // now after all, for two possible reasons:
-              //
-              // 1) A replica may have beat us to publishing these segments. 
In this case we want to delete the
-              //    segments we pushed (if they had unique paths) to avoid 
wasting space on deep storage.
-              // 2) We may have actually succeeded, but not realized it due to 
missing the confirmation response
-              //    from the overlord. In this case we do not want to delete 
the segments we pushed, since they are
-              //    now live!
-
-              final Set<SegmentIdWithShardSpec> segmentsIdentifiers = 
segmentsAndCommitMetadata
-                  .getSegments()
-                  .stream()
-                  .map(SegmentIdWithShardSpec::fromDataSegment)
-                  .collect(Collectors.toSet());
-
-              final Set<DataSegment> activeSegments = 
usedSegmentChecker.findUsedSegments(segmentsIdentifiers);
-
-              if (activeSegments.equals(ourSegments)) {
-                log.info(
-                    "Could not publish [%s] segments, but checked and found 
them already published; continuing.",
-                    ourSegments.size()
-                );
-                log.infoSegments(
-                    segmentsAndCommitMetadata.getSegments(),
-                    "Could not publish segments"
+      () -> {
+        try {
+          RetryUtils.retry(
+              () -> {
+              try {
+                final ImmutableSet<DataSegment> ourSegments = 
ImmutableSet.copyOf(pushedAndTombstones);
+                final SegmentPublishResult publishResult = 
publisher.publishSegments(
+                    segmentsToBeOverwritten,
+                    ourSegments,
+                    outputSegmentsAnnotateFunction,
+                    callerMetadata
                 );
 
-                // Clean up pushed segments if they are physically disjoint 
from the published ones (this means
-                // they were probably pushed by a replica, and with the unique 
paths option).
-                final boolean physicallyDisjoint = Sets.intersection(
-                    
activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()),
-                    
ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet())
-                ).isEmpty();
-
-                if (physicallyDisjoint) {
-                  
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
-                }
-              } else {
-                // Our segments aren't active. Publish failed for some reason. 
Clean them up and then throw an error.
-                
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
-
-                if (publishResult.getErrorMsg() != null) {
-                  log.errorSegments(ourSegments, "Failed to publish segments");
-                  throw new ISE(
-                      "Failed to publish segments because of [%s]",
-                      publishResult.getErrorMsg()
+                if (publishResult.isSuccess()) {
+                  log.info(
+                      "Published [%s] segments with commit metadata [%s]",
+                      segmentsAndCommitMetadata.getSegments().size(),
+                      callerMetadata
                   );
+                  log.infoSegments(segmentsAndCommitMetadata.getSegments(), 
"Published segments");
                 } else {
-                  log.errorSegments(ourSegments, "Failed to publish segments");
-                  throw new ISE("Failed to publish segments");
+                  // Publishing didn't affirmatively succeed. However, 
segments with our identifiers may still be active
+                  // now after all, for two possible reasons:
+                  //
+                  // 1) A replica may have beat us to publishing these 
segments. In this case we want to delete the
+                  //    segments we pushed (if they had unique paths) to avoid 
wasting space on deep storage.
+                  // 2) We may have actually succeeded, but not realized it 
due to missing the confirmation response
+                  //    from the overlord. In this case we do not want to 
delete the segments we pushed, since they are
+                  //    now live!
+
+                  final Set<SegmentIdWithShardSpec> segmentsIdentifiers = 
segmentsAndCommitMetadata
+                      .getSegments()
+                      .stream()
+                      .map(SegmentIdWithShardSpec::fromDataSegment)
+                      .collect(Collectors.toSet());
+
+                  final Set<DataSegment> activeSegments = 
usedSegmentChecker.findUsedSegments(segmentsIdentifiers);
+
+                  if (activeSegments.equals(ourSegments)) {
+                    log.info(
+                        "Could not publish [%s] segments, but checked and 
found them already published; continuing.",
+                        ourSegments.size()
+                    );
+                    log.infoSegments(
+                        segmentsAndCommitMetadata.getSegments(),
+                        "Could not publish segments"
+                    );
+
+                    // Clean up pushed segments if they are physically 
disjoint from the published ones (this means
+                    // they were probably pushed by a replica, and with the 
unique paths option).
+                    final boolean physicallyDisjoint = Sets.intersection(
+                        
activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()),
+                        
ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet())
+                    ).isEmpty();
+
+                    if (physicallyDisjoint) {
+                      
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
+                    }
+                  } else {
+                    log.errorSegments(ourSegments, "Failed to publish 
segments");
+                    if (publishResult.getErrorMsg() != null && 
publishResult.getErrorMsg().contains(("Failed to update the metadata Store. The 
new start metadata is ahead of last commited end state."))) {
+                      throw new ISE(publishResult.getErrorMsg());
+                    }
+                    // Our segments aren't active. Publish failed for some 
reason. Clean them up and then throw an error.
+                    
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
+                    if (publishResult.getErrorMsg() != null) {
+                      throw new ISE("Failed to publish segments because of 
[%s]", publishResult.getErrorMsg());
+                    }
+                    throw new ISE("Failed to publish segments");
+                  }
                 }
               }
-            }
-          }
-          catch (Exception e) {
-            // Must not remove segments here, we aren't sure if our 
transaction succeeded or not.
-            log.noStackTrace().warn(e, "Failed publish");
-            log.warnSegments(
-                segmentsAndCommitMetadata.getSegments(),
-                "Failed publish, not removing segments"
-            );
-            Throwables.propagateIfPossible(e);
-            throw new RuntimeException(e);
+              catch (Exception e) {
+                // Must not remove segments here, we aren't sure if our 
transaction succeeded or not.
+                log.noStackTrace().warn(e, "Failed publish");
+                log.warnSegments(
+                    segmentsAndCommitMetadata.getSegments(),
+                    "Failed publish, not removing segments"
+                );
+                Throwables.propagateIfPossible(e);
+                throw new RuntimeException(e);
+              }
+              return segmentsAndCommitMetadata;
+            },
+              e -> (e.getMessage() != null && e.getMessage().contains("Failed 
to update the metadata Store. The new start metadata is ahead of last commited 
end state.")),
+              RetryUtils.DEFAULT_MAX_TRIES
+          );
+        }
+        catch (Exception e) {
+          if (e.getMessage() != null && e.getMessage().contains("Failed to 
update the metadata Store. The new start metadata is ahead of last commited end 
state.")) {
+            // Publish failed for some reason. Clean them up and then throw an 
error.
+            
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
           }
-
-          return segmentsAndCommitMetadata;
+          Throwables.propagateIfPossible(e);

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [Throwables.propagateIfPossible](1) should be avoided because it 
has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/6435)



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java:
##########
@@ -596,91 +597,104 @@
                                   ? null
                                   : ((AppenderatorDriverMetadata) 
metadata).getCallerMetadata();
     return executor.submit(
-        () -> {
-          try {
-            final ImmutableSet<DataSegment> ourSegments = 
ImmutableSet.copyOf(pushedAndTombstones);
-            final SegmentPublishResult publishResult = 
publisher.publishSegments(
-                segmentsToBeOverwritten,
-                ourSegments,
-                outputSegmentsAnnotateFunction,
-                callerMetadata
-            );
-
-            if (publishResult.isSuccess()) {
-              log.info(
-                  "Published [%s] segments with commit metadata [%s]",
-                  segmentsAndCommitMetadata.getSegments().size(),
-                  callerMetadata
-              );
-              log.infoSegments(segmentsAndCommitMetadata.getSegments(), 
"Published segments");
-            } else {
-              // Publishing didn't affirmatively succeed. However, segments 
with our identifiers may still be active
-              // now after all, for two possible reasons:
-              //
-              // 1) A replica may have beat us to publishing these segments. 
In this case we want to delete the
-              //    segments we pushed (if they had unique paths) to avoid 
wasting space on deep storage.
-              // 2) We may have actually succeeded, but not realized it due to 
missing the confirmation response
-              //    from the overlord. In this case we do not want to delete 
the segments we pushed, since they are
-              //    now live!
-
-              final Set<SegmentIdWithShardSpec> segmentsIdentifiers = 
segmentsAndCommitMetadata
-                  .getSegments()
-                  .stream()
-                  .map(SegmentIdWithShardSpec::fromDataSegment)
-                  .collect(Collectors.toSet());
-
-              final Set<DataSegment> activeSegments = 
usedSegmentChecker.findUsedSegments(segmentsIdentifiers);
-
-              if (activeSegments.equals(ourSegments)) {
-                log.info(
-                    "Could not publish [%s] segments, but checked and found 
them already published; continuing.",
-                    ourSegments.size()
-                );
-                log.infoSegments(
-                    segmentsAndCommitMetadata.getSegments(),
-                    "Could not publish segments"
+      () -> {
+        try {
+          RetryUtils.retry(
+              () -> {
+              try {
+                final ImmutableSet<DataSegment> ourSegments = 
ImmutableSet.copyOf(pushedAndTombstones);
+                final SegmentPublishResult publishResult = 
publisher.publishSegments(
+                    segmentsToBeOverwritten,
+                    ourSegments,
+                    outputSegmentsAnnotateFunction,
+                    callerMetadata
                 );
 
-                // Clean up pushed segments if they are physically disjoint 
from the published ones (this means
-                // they were probably pushed by a replica, and with the unique 
paths option).
-                final boolean physicallyDisjoint = Sets.intersection(
-                    
activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()),
-                    
ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet())
-                ).isEmpty();
-
-                if (physicallyDisjoint) {
-                  
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
-                }
-              } else {
-                // Our segments aren't active. Publish failed for some reason. 
Clean them up and then throw an error.
-                
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
-
-                if (publishResult.getErrorMsg() != null) {
-                  log.errorSegments(ourSegments, "Failed to publish segments");
-                  throw new ISE(
-                      "Failed to publish segments because of [%s]",
-                      publishResult.getErrorMsg()
+                if (publishResult.isSuccess()) {
+                  log.info(
+                      "Published [%s] segments with commit metadata [%s]",
+                      segmentsAndCommitMetadata.getSegments().size(),
+                      callerMetadata
                   );
+                  log.infoSegments(segmentsAndCommitMetadata.getSegments(), 
"Published segments");
                 } else {
-                  log.errorSegments(ourSegments, "Failed to publish segments");
-                  throw new ISE("Failed to publish segments");
+                  // Publishing didn't affirmatively succeed. However, 
segments with our identifiers may still be active
+                  // now after all, for two possible reasons:
+                  //
+                  // 1) A replica may have beat us to publishing these 
segments. In this case we want to delete the
+                  //    segments we pushed (if they had unique paths) to avoid 
wasting space on deep storage.
+                  // 2) We may have actually succeeded, but not realized it 
due to missing the confirmation response
+                  //    from the overlord. In this case we do not want to 
delete the segments we pushed, since they are
+                  //    now live!
+
+                  final Set<SegmentIdWithShardSpec> segmentsIdentifiers = 
segmentsAndCommitMetadata
+                      .getSegments()
+                      .stream()
+                      .map(SegmentIdWithShardSpec::fromDataSegment)
+                      .collect(Collectors.toSet());
+
+                  final Set<DataSegment> activeSegments = 
usedSegmentChecker.findUsedSegments(segmentsIdentifiers);
+
+                  if (activeSegments.equals(ourSegments)) {
+                    log.info(
+                        "Could not publish [%s] segments, but checked and 
found them already published; continuing.",
+                        ourSegments.size()
+                    );
+                    log.infoSegments(
+                        segmentsAndCommitMetadata.getSegments(),
+                        "Could not publish segments"
+                    );
+
+                    // Clean up pushed segments if they are physically 
disjoint from the published ones (this means
+                    // they were probably pushed by a replica, and with the 
unique paths option).
+                    final boolean physicallyDisjoint = Sets.intersection(
+                        
activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()),
+                        
ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet())
+                    ).isEmpty();
+
+                    if (physicallyDisjoint) {
+                      
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
+                    }
+                  } else {
+                    log.errorSegments(ourSegments, "Failed to publish 
segments");
+                    if (publishResult.getErrorMsg() != null && 
publishResult.getErrorMsg().contains(("Failed to update the metadata Store. The 
new start metadata is ahead of last commited end state."))) {
+                      throw new ISE(publishResult.getErrorMsg());
+                    }
+                    // Our segments aren't active. Publish failed for some 
reason. Clean them up and then throw an error.
+                    
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
+                    if (publishResult.getErrorMsg() != null) {
+                      throw new ISE("Failed to publish segments because of 
[%s]", publishResult.getErrorMsg());
+                    }
+                    throw new ISE("Failed to publish segments");
+                  }
                 }
               }
-            }
-          }
-          catch (Exception e) {
-            // Must not remove segments here, we aren't sure if our 
transaction succeeded or not.
-            log.noStackTrace().warn(e, "Failed publish");
-            log.warnSegments(
-                segmentsAndCommitMetadata.getSegments(),
-                "Failed publish, not removing segments"
-            );
-            Throwables.propagateIfPossible(e);
-            throw new RuntimeException(e);
+              catch (Exception e) {
+                // Must not remove segments here, we aren't sure if our 
transaction succeeded or not.
+                log.noStackTrace().warn(e, "Failed publish");
+                log.warnSegments(
+                    segmentsAndCommitMetadata.getSegments(),
+                    "Failed publish, not removing segments"
+                );
+                Throwables.propagateIfPossible(e);

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [Throwables.propagateIfPossible](1) should be avoided because it 
has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/6434)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to