github-advanced-security[bot] commented on code in PR #15141:
URL: https://github.com/apache/druid/pull/15141#discussion_r1446853736
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java:
##########
@@ -596,91 +597,104 @@
? null
: ((AppenderatorDriverMetadata)
metadata).getCallerMetadata();
return executor.submit(
- () -> {
- try {
- final ImmutableSet<DataSegment> ourSegments =
ImmutableSet.copyOf(pushedAndTombstones);
- final SegmentPublishResult publishResult =
publisher.publishSegments(
- segmentsToBeOverwritten,
- ourSegments,
- outputSegmentsAnnotateFunction,
- callerMetadata
- );
-
- if (publishResult.isSuccess()) {
- log.info(
- "Published [%s] segments with commit metadata [%s]",
- segmentsAndCommitMetadata.getSegments().size(),
- callerMetadata
- );
- log.infoSegments(segmentsAndCommitMetadata.getSegments(),
"Published segments");
- } else {
- // Publishing didn't affirmatively succeed. However, segments
with our identifiers may still be active
- // now after all, for two possible reasons:
- //
- // 1) A replica may have beat us to publishing these segments.
In this case we want to delete the
- // segments we pushed (if they had unique paths) to avoid
wasting space on deep storage.
- // 2) We may have actually succeeded, but not realized it due to
missing the confirmation response
- // from the overlord. In this case we do not want to delete
the segments we pushed, since they are
- // now live!
-
- final Set<SegmentIdWithShardSpec> segmentsIdentifiers =
segmentsAndCommitMetadata
- .getSegments()
- .stream()
- .map(SegmentIdWithShardSpec::fromDataSegment)
- .collect(Collectors.toSet());
-
- final Set<DataSegment> activeSegments =
usedSegmentChecker.findUsedSegments(segmentsIdentifiers);
-
- if (activeSegments.equals(ourSegments)) {
- log.info(
- "Could not publish [%s] segments, but checked and found
them already published; continuing.",
- ourSegments.size()
- );
- log.infoSegments(
- segmentsAndCommitMetadata.getSegments(),
- "Could not publish segments"
+ () -> {
+ try {
+ RetryUtils.retry(
+ () -> {
+ try {
+ final ImmutableSet<DataSegment> ourSegments =
ImmutableSet.copyOf(pushedAndTombstones);
+ final SegmentPublishResult publishResult =
publisher.publishSegments(
+ segmentsToBeOverwritten,
+ ourSegments,
+ outputSegmentsAnnotateFunction,
+ callerMetadata
);
- // Clean up pushed segments if they are physically disjoint
from the published ones (this means
- // they were probably pushed by a replica, and with the unique
paths option).
- final boolean physicallyDisjoint = Sets.intersection(
-
activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()),
-
ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet())
- ).isEmpty();
-
- if (physicallyDisjoint) {
-
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
- }
- } else {
- // Our segments aren't active. Publish failed for some reason.
Clean them up and then throw an error.
-
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
-
- if (publishResult.getErrorMsg() != null) {
- log.errorSegments(ourSegments, "Failed to publish segments");
- throw new ISE(
- "Failed to publish segments because of [%s]",
- publishResult.getErrorMsg()
+ if (publishResult.isSuccess()) {
+ log.info(
+ "Published [%s] segments with commit metadata [%s]",
+ segmentsAndCommitMetadata.getSegments().size(),
+ callerMetadata
);
+ log.infoSegments(segmentsAndCommitMetadata.getSegments(),
"Published segments");
} else {
- log.errorSegments(ourSegments, "Failed to publish segments");
- throw new ISE("Failed to publish segments");
+ // Publishing didn't affirmatively succeed. However,
segments with our identifiers may still be active
+ // now after all, for two possible reasons:
+ //
+ // 1) A replica may have beat us to publishing these
segments. In this case we want to delete the
+ // segments we pushed (if they had unique paths) to avoid
wasting space on deep storage.
+ // 2) We may have actually succeeded, but not realized it
due to missing the confirmation response
+ // from the overlord. In this case we do not want to
delete the segments we pushed, since they are
+ // now live!
+
+ final Set<SegmentIdWithShardSpec> segmentsIdentifiers =
segmentsAndCommitMetadata
+ .getSegments()
+ .stream()
+ .map(SegmentIdWithShardSpec::fromDataSegment)
+ .collect(Collectors.toSet());
+
+ final Set<DataSegment> activeSegments =
usedSegmentChecker.findUsedSegments(segmentsIdentifiers);
+
+ if (activeSegments.equals(ourSegments)) {
+ log.info(
+ "Could not publish [%s] segments, but checked and
found them already published; continuing.",
+ ourSegments.size()
+ );
+ log.infoSegments(
+ segmentsAndCommitMetadata.getSegments(),
+ "Could not publish segments"
+ );
+
+ // Clean up pushed segments if they are physically
disjoint from the published ones (this means
+ // they were probably pushed by a replica, and with the
unique paths option).
+ final boolean physicallyDisjoint = Sets.intersection(
+
activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()),
+
ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet())
+ ).isEmpty();
+
+ if (physicallyDisjoint) {
+
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
+ }
+ } else {
+ log.errorSegments(ourSegments, "Failed to publish
segments");
+ if (publishResult.getErrorMsg() != null &&
publishResult.getErrorMsg().contains(("Failed to update the metadata Store. The
new start metadata is ahead of last commited end state."))) {
+ throw new ISE(publishResult.getErrorMsg());
+ }
+ // Our segments aren't active. Publish failed for some
reason. Clean them up and then throw an error.
+
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
+ if (publishResult.getErrorMsg() != null) {
+ throw new ISE("Failed to publish segments because of
[%s]", publishResult.getErrorMsg());
+ }
+ throw new ISE("Failed to publish segments");
+ }
}
}
- }
- }
- catch (Exception e) {
- // Must not remove segments here, we aren't sure if our
transaction succeeded or not.
- log.noStackTrace().warn(e, "Failed publish");
- log.warnSegments(
- segmentsAndCommitMetadata.getSegments(),
- "Failed publish, not removing segments"
- );
- Throwables.propagateIfPossible(e);
- throw new RuntimeException(e);
+ catch (Exception e) {
+ // Must not remove segments here, we aren't sure if our
transaction succeeded or not.
+ log.noStackTrace().warn(e, "Failed publish");
+ log.warnSegments(
+ segmentsAndCommitMetadata.getSegments(),
+ "Failed publish, not removing segments"
+ );
+ Throwables.propagateIfPossible(e);
+ throw new RuntimeException(e);
+ }
+ return segmentsAndCommitMetadata;
+ },
+ e -> (e.getMessage() != null && e.getMessage().contains("Failed
to update the metadata Store. The new start metadata is ahead of last commited
end state.")),
+ RetryUtils.DEFAULT_MAX_TRIES
+ );
+ }
+ catch (Exception e) {
+ if (e.getMessage() != null && e.getMessage().contains("Failed to
update the metadata Store. The new start metadata is ahead of last commited end
state.")) {
+ // Publish failed for some reason. Clean them up and then throw an
error.
+
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
}
-
- return segmentsAndCommitMetadata;
+ Throwables.propagateIfPossible(e);
Review Comment:
## Deprecated method or constructor invocation
Invoking [Throwables.propagateIfPossible](1) should be avoided because it
has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/6435)
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java:
##########
@@ -596,91 +597,104 @@
? null
: ((AppenderatorDriverMetadata)
metadata).getCallerMetadata();
return executor.submit(
- () -> {
- try {
- final ImmutableSet<DataSegment> ourSegments =
ImmutableSet.copyOf(pushedAndTombstones);
- final SegmentPublishResult publishResult =
publisher.publishSegments(
- segmentsToBeOverwritten,
- ourSegments,
- outputSegmentsAnnotateFunction,
- callerMetadata
- );
-
- if (publishResult.isSuccess()) {
- log.info(
- "Published [%s] segments with commit metadata [%s]",
- segmentsAndCommitMetadata.getSegments().size(),
- callerMetadata
- );
- log.infoSegments(segmentsAndCommitMetadata.getSegments(),
"Published segments");
- } else {
- // Publishing didn't affirmatively succeed. However, segments
with our identifiers may still be active
- // now after all, for two possible reasons:
- //
- // 1) A replica may have beat us to publishing these segments.
In this case we want to delete the
- // segments we pushed (if they had unique paths) to avoid
wasting space on deep storage.
- // 2) We may have actually succeeded, but not realized it due to
missing the confirmation response
- // from the overlord. In this case we do not want to delete
the segments we pushed, since they are
- // now live!
-
- final Set<SegmentIdWithShardSpec> segmentsIdentifiers =
segmentsAndCommitMetadata
- .getSegments()
- .stream()
- .map(SegmentIdWithShardSpec::fromDataSegment)
- .collect(Collectors.toSet());
-
- final Set<DataSegment> activeSegments =
usedSegmentChecker.findUsedSegments(segmentsIdentifiers);
-
- if (activeSegments.equals(ourSegments)) {
- log.info(
- "Could not publish [%s] segments, but checked and found
them already published; continuing.",
- ourSegments.size()
- );
- log.infoSegments(
- segmentsAndCommitMetadata.getSegments(),
- "Could not publish segments"
+ () -> {
+ try {
+ RetryUtils.retry(
+ () -> {
+ try {
+ final ImmutableSet<DataSegment> ourSegments =
ImmutableSet.copyOf(pushedAndTombstones);
+ final SegmentPublishResult publishResult =
publisher.publishSegments(
+ segmentsToBeOverwritten,
+ ourSegments,
+ outputSegmentsAnnotateFunction,
+ callerMetadata
);
- // Clean up pushed segments if they are physically disjoint
from the published ones (this means
- // they were probably pushed by a replica, and with the unique
paths option).
- final boolean physicallyDisjoint = Sets.intersection(
-
activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()),
-
ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet())
- ).isEmpty();
-
- if (physicallyDisjoint) {
-
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
- }
- } else {
- // Our segments aren't active. Publish failed for some reason.
Clean them up and then throw an error.
-
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
-
- if (publishResult.getErrorMsg() != null) {
- log.errorSegments(ourSegments, "Failed to publish segments");
- throw new ISE(
- "Failed to publish segments because of [%s]",
- publishResult.getErrorMsg()
+ if (publishResult.isSuccess()) {
+ log.info(
+ "Published [%s] segments with commit metadata [%s]",
+ segmentsAndCommitMetadata.getSegments().size(),
+ callerMetadata
);
+ log.infoSegments(segmentsAndCommitMetadata.getSegments(),
"Published segments");
} else {
- log.errorSegments(ourSegments, "Failed to publish segments");
- throw new ISE("Failed to publish segments");
+ // Publishing didn't affirmatively succeed. However,
segments with our identifiers may still be active
+ // now after all, for two possible reasons:
+ //
+ // 1) A replica may have beat us to publishing these
segments. In this case we want to delete the
+ // segments we pushed (if they had unique paths) to avoid
wasting space on deep storage.
+ // 2) We may have actually succeeded, but not realized it
due to missing the confirmation response
+ // from the overlord. In this case we do not want to
delete the segments we pushed, since they are
+ // now live!
+
+ final Set<SegmentIdWithShardSpec> segmentsIdentifiers =
segmentsAndCommitMetadata
+ .getSegments()
+ .stream()
+ .map(SegmentIdWithShardSpec::fromDataSegment)
+ .collect(Collectors.toSet());
+
+ final Set<DataSegment> activeSegments =
usedSegmentChecker.findUsedSegments(segmentsIdentifiers);
+
+ if (activeSegments.equals(ourSegments)) {
+ log.info(
+ "Could not publish [%s] segments, but checked and
found them already published; continuing.",
+ ourSegments.size()
+ );
+ log.infoSegments(
+ segmentsAndCommitMetadata.getSegments(),
+ "Could not publish segments"
+ );
+
+ // Clean up pushed segments if they are physically
disjoint from the published ones (this means
+ // they were probably pushed by a replica, and with the
unique paths option).
+ final boolean physicallyDisjoint = Sets.intersection(
+
activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()),
+
ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet())
+ ).isEmpty();
+
+ if (physicallyDisjoint) {
+
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
+ }
+ } else {
+ log.errorSegments(ourSegments, "Failed to publish
segments");
+ if (publishResult.getErrorMsg() != null &&
publishResult.getErrorMsg().contains(("Failed to update the metadata Store. The
new start metadata is ahead of last commited end state."))) {
+ throw new ISE(publishResult.getErrorMsg());
+ }
+ // Our segments aren't active. Publish failed for some
reason. Clean them up and then throw an error.
+
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
+ if (publishResult.getErrorMsg() != null) {
+ throw new ISE("Failed to publish segments because of
[%s]", publishResult.getErrorMsg());
+ }
+ throw new ISE("Failed to publish segments");
+ }
}
}
- }
- }
- catch (Exception e) {
- // Must not remove segments here, we aren't sure if our
transaction succeeded or not.
- log.noStackTrace().warn(e, "Failed publish");
- log.warnSegments(
- segmentsAndCommitMetadata.getSegments(),
- "Failed publish, not removing segments"
- );
- Throwables.propagateIfPossible(e);
- throw new RuntimeException(e);
+ catch (Exception e) {
+ // Must not remove segments here, we aren't sure if our
transaction succeeded or not.
+ log.noStackTrace().warn(e, "Failed publish");
+ log.warnSegments(
+ segmentsAndCommitMetadata.getSegments(),
+ "Failed publish, not removing segments"
+ );
+ Throwables.propagateIfPossible(e);
Review Comment:
## Deprecated method or constructor invocation
Invoking [Throwables.propagateIfPossible](1) should be avoided because it
has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/6434)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]