StefanRRichter commented on code in PR #24414:
URL: https://github.com/apache/flink/pull/24414#discussion_r1507454476


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java:
##########
@@ -141,6 +150,48 @@ private CompletableFuture<Map<String, String>> 
labelFailure(Throwable cause, boo
         return FailureEnricherUtils.labelFailure(cause, ctx, 
mainThreadExecutor, failureEnrichers);
     }
 
+    private FailureHandlingResult handleFailureAndReport(
+            @Nullable final Execution failedExecution,
+            final Throwable cause,
+            long timestamp,
+            final Set<ExecutionVertexID> verticesToRestart,
+            final boolean globalFailure) {
+
+        FailureHandlingResult failureHandlingResult =
+                handleFailure(failedExecution, cause, timestamp, 
verticesToRestart, globalFailure);
+
+        // Add reporting as callback for when the failure labeling is 
completed.
+        failureHandlingResult
+                .getFailureLabels()
+                .thenAcceptAsync(
+                        labels -> reportFailureHandling(failureHandlingResult, 
labels),
+                        mainThreadExecutor);
+
+        return failureHandlingResult;
+    }
+
+    private void reportFailureHandling(
+            FailureHandlingResult failureHandlingResult, Map<String, String> 
failureLabels) {
+
+        // Add base attributes
+        SpanBuilder spanBuilder =
+                Span.builder(ExecutionFailureHandler.class, "HandleFailure")
+                        .setStartTsMillis(failureHandlingResult.getTimestamp())
+                        .setEndTsMillis(System.currentTimeMillis())
+                        .setAttribute(
+                                "canRestart", 
String.valueOf(failureHandlingResult.canRestart()))
+                        .setAttribute(
+                                "isGlobalFailure",
+                                
String.valueOf(failureHandlingResult.isGlobalFailure()));
+
+        // Add all failure labels
+        for (Map.Entry<String, String> entry : failureLabels.entrySet()) {
+            spanBuilder.setAttribute(
+                    FAILURE_LABEL_ATTRIBUTE_PREFIX + entry.getKey(), 
entry.getValue());
+        }
+        metricGroup.addSpan(spanBuilder);

Review Comment:
   How can we report events? Didn't we say that functionality doesn't exist, 
yet and that's why we use spans until we can replace them with events?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to