pnowojski commented on code in PR #24414:
URL: https://github.com/apache/flink/pull/24414#discussion_r1507534949


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java:
##########
@@ -141,6 +150,48 @@ private CompletableFuture<Map<String, String>> 
labelFailure(Throwable cause, boo
         return FailureEnricherUtils.labelFailure(cause, ctx, 
mainThreadExecutor, failureEnrichers);
     }
 
+    private FailureHandlingResult handleFailureAndReport(
+            @Nullable final Execution failedExecution,
+            final Throwable cause,
+            long timestamp,
+            final Set<ExecutionVertexID> verticesToRestart,
+            final boolean globalFailure) {
+
+        FailureHandlingResult failureHandlingResult =
+                handleFailure(failedExecution, cause, timestamp, 
verticesToRestart, globalFailure);
+
+        // Add reporting as callback for when the failure labeling is 
completed.
+        failureHandlingResult
+                .getFailureLabels()
+                .thenAcceptAsync(
+                        labels -> reportFailureHandling(failureHandlingResult, 
labels),
+                        mainThreadExecutor);
+
+        return failureHandlingResult;
+    }
+
+    private void reportFailureHandling(
+            FailureHandlingResult failureHandlingResult, Map<String, String> 
failureLabels) {
+
+        // Add base attributes
+        SpanBuilder spanBuilder =
+                Span.builder(ExecutionFailureHandler.class, "HandleFailure")
+                        .setStartTsMillis(failureHandlingResult.getTimestamp())
+                        .setEndTsMillis(System.currentTimeMillis())
+                        .setAttribute(
+                                "canRestart", 
String.valueOf(failureHandlingResult.canRestart()))
+                        .setAttribute(
+                                "isGlobalFailure",
+                                
String.valueOf(failureHandlingResult.isGlobalFailure()));
+
+        // Add all failure labels
+        for (Map.Entry<String, String> entry : failureLabels.entrySet()) {
+            spanBuilder.setAttribute(
+                    FAILURE_LABEL_ATTRIBUTE_PREFIX + entry.getKey(), 
entry.getValue());
+        }
+        metricGroup.addSpan(spanBuilder);

Review Comment:
   As discussed offline. Let's for the time being report failures via spans, 
but hidden behind a temporary feature toggle like "use spans instead events". 
This option should be disabled by default and marked as deprecated from the 
beginning. Once Flink will support events, we can remove that option after a 
release or so.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to