This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f3598c50c0d [FLINK-31888] Introduce interfaces and utility classes 
related to enrichment/labelling of failures leading to job restart.'
f3598c50c0d is described below

commit f3598c50c0d3dcdf8058b01f13b7eb9fc5954f7c
Author: Panagiotis Garefalakis <[email protected]>
AuthorDate: Sun Apr 23 18:53:44 2023 -0700

    [FLINK-31888] Introduce interfaces and utility classes related to 
enrichment/labelling of failures leading to job restart.'
    
    This closes #22467
---
 .../generated/all_jobmanager_section.html          |   6 +
 .../generated/job_manager_configuration.html       |   6 +
 .../flink/configuration/JobManagerOptions.java     |  24 ++
 .../apache/flink/core/failure/FailureEnricher.java | 128 ++++++++++
 .../flink/core/failure/FailureEnricherFactory.java |  36 +++
 .../failure/DefaultFailureEnricherContext.java     | 116 +++++++++
 .../runtime/failure/FailureEnricherUtils.java      | 228 +++++++++++++++++
 .../runtime/failure/FailureEnricherUtilsTest.java  | 281 +++++++++++++++++++++
 8 files changed, 825 insertions(+)

diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html 
b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
index 7d58542c0e6..7ed59b46827 100644
--- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html
+++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
@@ -44,6 +44,12 @@
             <td>String</td>
             <td>This option specifies how the job computation recovers from 
task failures. Accepted values are:<ul><li>'full': Restarts all tasks to 
recover the job.</li><li>'region': Restarts all tasks that could be affected by 
the task failure. More details can be found <a 
href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/ops/state/task_failure_recovery/#restart-pipelined-region-failover-strategy">here</a>.</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>jobmanager.failure-enrichers</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>An optional list of failure enricher names. If empty, NO 
failure enrichers will be started. If configured, only enrichers whose name 
matches any of the names in the list will be started.</td>
+        </tr>
         <tr>
             <td><h5>jobmanager.future-pool.size</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/docs/layouts/shortcodes/generated/job_manager_configuration.html 
b/docs/layouts/shortcodes/generated/job_manager_configuration.html
index 78adcbdfa3f..aa97c181def 100644
--- a/docs/layouts/shortcodes/generated/job_manager_configuration.html
+++ b/docs/layouts/shortcodes/generated/job_manager_configuration.html
@@ -50,6 +50,12 @@
             <td>String</td>
             <td>This option specifies how the job computation recovers from 
task failures. Accepted values are:<ul><li>'full': Restarts all tasks to 
recover the job.</li><li>'region': Restarts all tasks that could be affected by 
the task failure. More details can be found <a 
href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/ops/state/task_failure_recovery/#restart-pipelined-region-failover-strategy">here</a>.</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>jobmanager.failure-enrichers</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>An optional list of failure enricher names. If empty, NO 
failure enrichers will be started. If configured, only enrichers whose name 
matches any of the names in the list will be started.</td>
+        </tr>
         <tr>
             <td><h5>jobmanager.future-pool.size</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 61e0c6ea453..a421c7cded4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -263,6 +263,30 @@ public class JobManagerOptions {
                     .withDescription(
                             "The maximum number of historical execution 
attempts kept in history.");
 
+    /**
+     * Flag indicating whether JobManager should load available Failure 
Enricher plugins at startup.
+     * An optional list of Failure Enricher names. If empty, NO enrichers will 
be started. If
+     * configured, only enrichers whose name (as returned by class.getName()) 
matches any of the
+     * names in the list will be started.
+     *
+     * <p>Example:
+     *
+     * <pre>{@code
+     * jobmanager.failure-enrichers = 
org.apache.flink.test.plugin.jar.failure.TypeFailureEnricher, 
org.apache.flink.runtime.failure.FailureEnricherUtilsTest$TestEnricher
+     *
+     * }</pre>
+     */
+    @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
+    public static final ConfigOption<String> FAILURE_ENRICHERS_LIST =
+            key("jobmanager.failure-enrichers")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "An optional list of failure enricher names."
+                                    + " If empty, NO failure enrichers will be 
started."
+                                    + " If configured, only enrichers whose 
name matches"
+                                    + " any of the names in the list will be 
started.");
+
     /**
      * This option specifies the failover strategy, i.e. how the job 
computation recovers from task
      * failures.
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java 
b/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java
new file mode 100644
index 00000000000..5bec001c101
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.failure;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Failure Enricher enabling custom logic and attaching metadata in the form 
of labels to each type
+ * of failure as tracked in the JobMaster.
+ */
+@Experimental
+public interface FailureEnricher {
+
+    /**
+     * Method to list all the label Keys the enricher can associate with 
Values in case of a failure
+     * {@code processFailure}. Note that Keys must unique and properly defined 
per enricher
+     * implementation otherwise will be ignored.
+     *
+     * @return the unique label Keys of the FailureEnricher
+     */
+    Set<String> getOutputKeys();
+
+    /**
+     * Method to handle a failure as part of the enricher and optionally 
return a map of KV pairs
+     * (labels). Note that Values should only be associated with Keys from 
{@code getOutputKeys}
+     * method otherwise will be ignored.
+     *
+     * @param cause the exception that caused this failure
+     * @param context the context that includes extra information (e.g., if it 
was a global failure)
+     * @return map of KV pairs (labels) associated with the failure
+     */
+    CompletableFuture<Map<String, String>> processFailure(
+            final Throwable cause, final Context context);
+
+    /**
+     * An interface used by the {@link FailureEnricher}. Context includes an 
executor pool for the
+     * enrichers to run heavy operations, the Classloader used for code gen, 
and other metadata.
+     */
+    @Experimental
+    interface Context {
+
+        /** Type of failure. */
+        enum FailureType {
+            /**
+             * The failure has occurred in the scheduler context and can't be 
tracked back to a
+             * particular task.
+             */
+            GLOBAL,
+            /** The failure has been reported by a particular task. */
+            TASK,
+            /**
+             * The TaskManager has non-gracefully disconnected from the 
JobMaster or we have not
+             * received heartbeats for the {@link
+             * 
org.apache.flink.configuration.HeartbeatManagerOptions#HEARTBEAT_INTERVAL 
configured
+             * timeout}.
+             */
+            TASK_MANAGER
+        }
+
+        /**
+         * Get the ID of the job.
+         *
+         * @return the ID of the job
+         */
+        JobID getJobId();
+
+        /**
+         * Get the name of the job.
+         *
+         * @return the name of the job
+         */
+        String getJobName();
+
+        /**
+         * Get the metric group of the JobMaster.
+         *
+         * @return the metric group of the JobMaster
+         */
+        MetricGroup getMetricGroup();
+
+        /**
+         * Return the type of the failure e.g., global failure that happened 
in the scheduler
+         * context.
+         *
+         * @return FailureType
+         */
+        FailureType getFailureType();
+
+        /**
+         * Get the user {@link ClassLoader} used for code generation, UDF 
loading and other
+         * operations requiring reflections on user code.
+         *
+         * @return the user ClassLoader
+         */
+        ClassLoader getUserClassLoader();
+
+        /**
+         * Get an Executor pool for the Enrichers to run async operations that 
can potentially be
+         * IO-heavy.
+         *
+         * @return the Executor pool
+         */
+        Executor getIOExecutor();
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricherFactory.java
 
b/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricherFactory.java
new file mode 100644
index 00000000000..a94aa8b4369
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricherFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.failure;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.Plugin;
+
+/** Factory class for creating {@link FailureEnricher}. */
+@Experimental
+public interface FailureEnricherFactory extends Plugin {
+
+    /**
+     * Construct a FailureEnricher.
+     *
+     * @param conf configuration for this failure enricher
+     * @return the FailureEnricher
+     */
+    FailureEnricher createFailureEnricher(Configuration conf);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/failure/DefaultFailureEnricherContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/DefaultFailureEnricherContext.java
new file mode 100644
index 00000000000..2250a8bd672
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/DefaultFailureEnricherContext.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failure;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.failure.FailureEnricher;
+import org.apache.flink.core.failure.FailureEnricher.Context;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The default implementation of {@link Context} class. */
+public class DefaultFailureEnricherContext implements FailureEnricher.Context {
+    private final JobID jobID;
+    private final String jobName;
+    private final MetricGroup metricGroup;
+    private final Executor ioExecutor;
+    private final ClassLoader userClassLoader;
+    private final FailureType failureType;
+
+    private DefaultFailureEnricherContext(
+            JobID jobID,
+            String jobName,
+            MetricGroup metricGroup,
+            FailureType failureType,
+            Executor ioExecutor,
+            ClassLoader classLoader) {
+        this.jobID = jobID;
+        this.jobName = jobName;
+        this.metricGroup = metricGroup;
+        this.failureType = failureType;
+        this.ioExecutor = checkNotNull(ioExecutor);
+        this.userClassLoader = classLoader;
+    }
+
+    @Override
+    public JobID getJobId() {
+        return this.jobID;
+    }
+
+    @Override
+    public String getJobName() {
+        return this.jobName;
+    }
+
+    @Override
+    public MetricGroup getMetricGroup() {
+        return this.metricGroup;
+    }
+
+    @Override
+    public FailureType getFailureType() {
+        return failureType;
+    }
+
+    @Override
+    public ClassLoader getUserClassLoader() {
+        return this.userClassLoader;
+    }
+
+    @Override
+    public Executor getIOExecutor() {
+        return ioExecutor;
+    }
+
+    /** Factory method returning a Task failure Context for the given params. 
*/
+    public static Context forTaskFailure(
+            JobID jobID,
+            String jobName,
+            MetricGroup metricGroup,
+            Executor ioExecutor,
+            ClassLoader classLoader) {
+        return new DefaultFailureEnricherContext(
+                jobID, jobName, metricGroup, FailureType.TASK, ioExecutor, 
classLoader);
+    }
+
+    /** Factory method returning a Global failure Context for the given 
params. */
+    public static Context forGlobalFailure(
+            JobID jobID,
+            String jobName,
+            MetricGroup metricGroup,
+            Executor ioExecutor,
+            ClassLoader classLoader) {
+        return new DefaultFailureEnricherContext(
+                jobID, jobName, metricGroup, FailureType.GLOBAL, ioExecutor, 
classLoader);
+    }
+
+    /** Factory method returning a TaskManager failure Context for the given 
params. */
+    public static Context forTaskManagerFailure(
+            JobID jobID,
+            String jobName,
+            MetricGroup metricGroup,
+            Executor ioExecutor,
+            ClassLoader classLoader) {
+        return new DefaultFailureEnricherContext(
+                jobID, jobName, metricGroup, FailureType.TASK_MANAGER, 
ioExecutor, classLoader);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
new file mode 100644
index 00000000000..e9a111405a7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failure;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.failure.FailureEnricher;
+import org.apache.flink.core.failure.FailureEnricher.Context;
+import org.apache.flink.core.failure.FailureEnricherFactory;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/** Utils class for loading and running pluggable failure enrichers. */
+public class FailureEnricherUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FailureEnricherUtils.class);
+    // regex pattern to split the defined failure enrichers
+    private static final Pattern enricherListPattern = 
Pattern.compile("\\s*,\\s*");
+    static final String MERGE_EXCEPTION_MSG =
+            "Trying to merge a label with a duplicate key %s. This is a bug 
that should be reported,"
+                    + " because Flink shouldn't allow registering enrichers 
with the same output.";
+
+    /**
+     * Returns a set of validated FailureEnrichers for a given configuration.
+     *
+     * @param configuration the configuration for the job
+     * @return a collection of validated FailureEnrichers
+     */
+    public static Collection<FailureEnricher> getFailureEnrichers(
+            final Configuration configuration) {
+        final PluginManager pluginManager =
+                PluginUtils.createPluginManagerFromRootFolder(configuration);
+        return getFailureEnrichers(configuration, pluginManager);
+    }
+
+    @VisibleForTesting
+    static Collection<FailureEnricher> getFailureEnrichers(
+            final Configuration configuration, final PluginManager 
pluginManager) {
+        Set<String> includedEnrichers = 
getIncludedFailureEnrichers(configuration);
+        //  When empty, NO enrichers will be started.
+        if (includedEnrichers.isEmpty()) {
+            return Collections.emptySet();
+        }
+        final Iterator<FailureEnricherFactory> factoryIterator =
+                pluginManager.load(FailureEnricherFactory.class);
+        final Set<FailureEnricher> failureEnrichers = new HashSet<>();
+        while (factoryIterator.hasNext()) {
+            try {
+                final FailureEnricherFactory failureEnricherFactory = 
factoryIterator.next();
+                final FailureEnricher failureEnricher =
+                        
failureEnricherFactory.createFailureEnricher(configuration);
+                if 
(includedEnrichers.contains(failureEnricher.getClass().getName())) {
+                    failureEnrichers.add(failureEnricher);
+                    LOG.info(
+                            "Found failure enricher {} at {}.",
+                            failureEnricherFactory.getClass().getName(),
+                            new File(
+                                            failureEnricher
+                                                    .getClass()
+                                                    .getProtectionDomain()
+                                                    .getCodeSource()
+                                                    .getLocation()
+                                                    .toURI())
+                                    .getCanonicalPath());
+                } else {
+                    LOG.debug(
+                            "Excluding failure enricher {}, not configured in 
enricher list ({}).",
+                            failureEnricherFactory.getClass().getName(),
+                            includedEnrichers);
+                }
+            } catch (Exception e) {
+                LOG.warn("Error while loading failure enricher factory.", e);
+            }
+        }
+
+        return filterInvalidEnrichers(failureEnrichers);
+    }
+
+    /**
+     * Returns a set of failure enricher names included in the given 
configuration.
+     *
+     * @param configuration the configuration to get the failure enricher 
names from
+     * @return failure enricher names
+     */
+    @VisibleForTesting
+    static Set<String> getIncludedFailureEnrichers(final Configuration 
configuration) {
+        final String includedEnrichersString =
+                
configuration.getString(JobManagerOptions.FAILURE_ENRICHERS_LIST, "");
+        return enricherListPattern
+                .splitAsStream(includedEnrichersString)
+                .filter(r -> !r.isEmpty())
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Filters out invalid {@link FailureEnricher} objects that have duplicate 
output keys.
+     *
+     * @param failureEnrichers a set of {@link FailureEnricher} objects to 
filter
+     * @return a filtered collection without any duplicate output keys
+     */
+    @VisibleForTesting
+    static Collection<FailureEnricher> filterInvalidEnrichers(
+            final Set<FailureEnricher> failureEnrichers) {
+        final Map<String, Set<Class<?>>> enrichersByKey = new HashMap<>();
+        failureEnrichers.forEach(
+                enricher ->
+                        enricher.getOutputKeys()
+                                .forEach(
+                                        enricherKey ->
+                                                enrichersByKey
+                                                        .computeIfAbsent(
+                                                                enricherKey,
+                                                                ignored -> new 
HashSet<>())
+                                                        
.add(enricher.getClass())));
+        final Set<Class<?>> invalidEnrichers =
+                enrichersByKey.entrySet().stream()
+                        .filter(entry -> entry.getValue().size() > 1)
+                        .flatMap(
+                                entry -> {
+                                    LOG.warn(
+                                            "Following enrichers have have 
registered duplicate output key [%s] and will be ignored: {}.",
+                                            entry.getValue().stream()
+                                                    .map(Class::getName)
+                                                    
.collect(Collectors.joining(", ")));
+                                    return entry.getValue().stream();
+                                })
+                        .collect(Collectors.toSet());
+        return failureEnrichers.stream()
+                .filter(enricher -> 
!invalidEnrichers.contains(enricher.getClass()))
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Enriches a Throwable by returning the merged label output of a Set of 
FailureEnrichers.
+     *
+     * @param cause the Throwable to label
+     * @param context the context of the Throwable
+     * @param failureEnrichers a collection of FailureEnrichers to enrich the 
context with
+     * @return a CompletableFuture that will complete with a map of labels
+     */
+    public static CompletableFuture<Map<String, String>> labelFailure(
+            final Throwable cause,
+            final Context context,
+            final Collection<FailureEnricher> failureEnrichers) {
+        // list of CompletableFutures to enrich failure with labels from each 
enricher
+        final Collection<CompletableFuture<Map<String, String>>> enrichFutures 
= new ArrayList<>();
+
+        for (final FailureEnricher enricher : failureEnrichers) {
+            enrichFutures.add(
+                    enricher.processFailure(cause, context)
+                            .thenApply(
+                                    enricherLabels -> {
+                                        final Map<String, String> validLabels 
= new HashMap<>();
+                                        enricherLabels.forEach(
+                                                (k, v) -> {
+                                                    if 
(!enricher.getOutputKeys().contains(k)) {
+                                                        LOG.warn(
+                                                                "Ignoring 
label with key {} from enricher {}"
+                                                                        + " 
violating contract, keys allowed {}.",
+                                                                k,
+                                                                
enricher.getClass(),
+                                                                
enricher.getOutputKeys());
+                                                    } else {
+                                                        validLabels.put(k, v);
+                                                    }
+                                                });
+                                        return validLabels;
+                                    }));
+        }
+        // combine all CompletableFutures into a single CompletableFuture 
containing a Map of labels
+        return FutureUtils.combineAll(enrichFutures)
+                .thenApply(
+                        labelsToMerge -> {
+                            final Map<String, String> mergedLabels = new 
HashMap<>();
+                            for (Map<String, String> labels : labelsToMerge) {
+                                labels.forEach(
+                                        (k, v) ->
+                                                // merge label with existing, 
throwing an exception
+                                                // if there is a key conflict
+                                                mergedLabels.merge(
+                                                        k,
+                                                        v,
+                                                        (first, second) -> {
+                                                            throw new 
FlinkRuntimeException(
+                                                                    
String.format(
+                                                                            
MERGE_EXCEPTION_MSG,
+                                                                            
k));
+                                                        }));
+                            }
+                            return mergedLabels;
+                        });
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
new file mode 100644
index 00000000000..d86b0e06e37
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failure;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.failure.FailureEnricher;
+import org.apache.flink.core.failure.FailureEnricherFactory;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.TestingPluginManager;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static 
org.apache.flink.runtime.failure.FailureEnricherUtils.MERGE_EXCEPTION_MSG;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link FailureEnricherUtils} class. */
+@ExtendWith(TestLoggerExtension.class)
+class FailureEnricherUtilsTest {
+
+    @Test
+    public void testGetIncludedFailureEnrichers() {
+        Configuration conf = new Configuration();
+
+        // Disabled feature
+        conf.setString(JobManagerOptions.FAILURE_ENRICHERS_LIST, "");
+        Set<String> result = 
FailureEnricherUtils.getIncludedFailureEnrichers(conf);
+        assertThat(result).hasSize(0);
+
+        // Single enricher
+        conf.setString(JobManagerOptions.FAILURE_ENRICHERS_LIST, "enricher1");
+        result = FailureEnricherUtils.getIncludedFailureEnrichers(conf);
+        assertThat(result).hasSize(1);
+        assertThat(result).contains("enricher1");
+
+        // Multiple enrichers with spaces
+        conf.setString(JobManagerOptions.FAILURE_ENRICHERS_LIST, "enricher1, 
enricher2, enricher3");
+        result = FailureEnricherUtils.getIncludedFailureEnrichers(conf);
+        assertThat(result).hasSize(3);
+        assertThat(result).contains("enricher1", "enricher2", "enricher3");
+
+        // Bad delimiter
+        conf.setString(JobManagerOptions.FAILURE_ENRICHERS_LIST, "enricher1. 
enricher2. enricher3");
+        result = FailureEnricherUtils.getIncludedFailureEnrichers(conf);
+        assertThat(result).hasSize(1);
+        assertThat(result).contains("enricher1. enricher2. enricher3");
+
+        // Multiple enrichers with spaces and empty values
+        conf.setString(
+                JobManagerOptions.FAILURE_ENRICHERS_LIST, "enricher1, 
,enricher2,   enricher3");
+        result = FailureEnricherUtils.getIncludedFailureEnrichers(conf);
+        assertThat(result).hasSize(3);
+        assertThat(result).contains("enricher1", "enricher2", "enricher3");
+    }
+
+    @Test
+    public void testGetFailureEnrichers() {
+        final Configuration configuration = new Configuration();
+        final Collection<FailureEnricher> emptyEnrichers =
+                FailureEnricherUtils.getFailureEnrichers(configuration, 
createPluginManager());
+
+        // Empty -> disabled feature
+        assertThat(emptyEnrichers).hasSize(0);
+
+        // Invalid Name
+        configuration.set(
+                JobManagerOptions.FAILURE_ENRICHERS_LIST, 
FailureEnricherUtilsTest.class.getName());
+        final Collection<FailureEnricher> invalidEnrichers =
+                FailureEnricherUtils.getFailureEnrichers(configuration, 
createPluginManager());
+        // Excluding failure enricher
+        assertThat(invalidEnrichers).hasSize(0);
+
+        // Valid Name plus loading
+        configuration.set(JobManagerOptions.FAILURE_ENRICHERS_LIST, 
TestEnricher.class.getName());
+        final Collection<FailureEnricher> enrichers =
+                FailureEnricherUtils.getFailureEnrichers(configuration, 
createPluginManager());
+        assertThat(enrichers).hasSize(1);
+        // verify that the failure enricher was created and returned
+        
assertThat(enrichers.iterator().next()).isInstanceOf(TestEnricher.class);
+    }
+
+    @Test
+    public void testGetValidatedEnrichers() {
+        // create two enrichers with non-overlapping keys
+        final FailureEnricher firstEnricher = new TestEnricher("key1");
+        final FailureEnricher secondEnricher = new TestEnricher("key2");
+
+        final Set<FailureEnricher> enrichers =
+                new HashSet<FailureEnricher>() {
+                    {
+                        add(firstEnricher);
+                        add(secondEnricher);
+                    }
+                };
+
+        final Collection<FailureEnricher> validatedEnrichers =
+                FailureEnricherUtils.filterInvalidEnrichers(enrichers);
+
+        // expect both enrichers to be valid
+        assertThat(validatedEnrichers).hasSize(2);
+        assertThat(validatedEnrichers).contains(firstEnricher, secondEnricher);
+    }
+
+    @Test
+    public void testValidatedEnrichersWithInvalidEntries() {
+        // create two enrichers with overlapping keys and a valid one -- must 
be different classes
+        final FailureEnricher validEnricher = new TestEnricher("validKey");
+        final FailureEnricher firstOverlapEnricher = new 
AnotherTestEnricher("key1", "key2");
+        final FailureEnricher secondOverlapEnricher = new 
AndAnotherTestEnricher("key2", "key3");
+
+        final Set<FailureEnricher> enrichers =
+                new HashSet<FailureEnricher>() {
+                    {
+                        add(validEnricher);
+                        add(firstOverlapEnricher);
+                        add(secondOverlapEnricher);
+                    }
+                };
+
+        final Collection<FailureEnricher> validatedEnrichers =
+                FailureEnricherUtils.filterInvalidEnrichers(enrichers);
+        // Only one enricher is valid
+        assertThat(validatedEnrichers).hasSize(1);
+    }
+
+    @Test
+    public void testLabelFutureWithValidEnricher() {
+        // validate labelFailure by enricher with correct outputKeys
+        final Throwable cause = new RuntimeException("test exception");
+        final Set<FailureEnricher> failureEnrichers = new HashSet<>();
+        final FailureEnricher validEnricher = new TestEnricher("enricherKey");
+        failureEnrichers.add(validEnricher);
+
+        final CompletableFuture<Map<String, String>> result =
+                FailureEnricherUtils.labelFailure(cause, null, 
failureEnrichers);
+
+        assertThatFuture(result)
+                .eventuallySucceeds()
+                .satisfies(
+                        labels -> {
+                            assertThat(labels).hasSize(1);
+                            assertThat(labels).containsKey("enricherKey");
+                            
assertThat(labels).containsValue("enricherKeyValue");
+                        });
+    }
+
+    @Test
+    public void testLabelFailureWithInvalidEnricher() {
+        // validate labelFailure by enricher with wrong outputKeys
+        final Throwable cause = new RuntimeException("test exception");
+        final String invalidEnricherKey = "invalidKey";
+        final Set<FailureEnricher> failureEnrichers = new HashSet<>();
+        final FailureEnricher invalidEnricher =
+                new TestEnricher(
+                        Collections.singletonMap(invalidEnricherKey, 
"enricherValue"),
+                        "enricherKey");
+        failureEnrichers.add(invalidEnricher);
+
+        final CompletableFuture<Map<String, String>> result =
+                FailureEnricherUtils.labelFailure(cause, null, 
failureEnrichers);
+        // Ignoring labels
+        assertThatFuture(result).eventuallySucceeds().satisfies(labels -> 
labels.isEmpty());
+    }
+
+    @Test
+    public void testLabelFailureMergeException() {
+        // Throwing exception labelFailure when merging duplicate keys
+        final Throwable cause = new RuntimeException("test failure");
+        final FailureEnricher firstEnricher = new TestEnricher("key1", "key2");
+        final FailureEnricher secondEnricher = new TestEnricher("key2", 
"key3");
+        final Set<FailureEnricher> enrichers =
+                new HashSet<FailureEnricher>() {
+                    {
+                        add(firstEnricher);
+                        add(secondEnricher);
+                    }
+                };
+
+        final CompletableFuture<Map<String, String>> result =
+                FailureEnricherUtils.labelFailure(cause, null, enrichers);
+
+        try {
+            result.get();
+        } catch (Exception e) {
+            
assertThat(e).hasMessageContaining(String.format(MERGE_EXCEPTION_MSG, "key2"));
+        }
+    }
+
+    /**
+     * Testing plugin manager for {@link FailureEnricherFactory} utilizing 
{@link
+     * TestFailureEnricherFactory}.
+     *
+     * @return the testing PluginManager
+     */
+    private static PluginManager createPluginManager() {
+        final Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
+        plugins.put(
+                FailureEnricherFactory.class,
+                IteratorUtils.singletonIterator(new 
TestFailureEnricherFactory()));
+        return new TestingPluginManager(plugins);
+    }
+
+    /** Factory implementation of {@link TestEnricher} used for plugin load 
testing. */
+    private static class TestFailureEnricherFactory implements 
FailureEnricherFactory {
+
+        @Override
+        public FailureEnricher createFailureEnricher(Configuration conf) {
+            return new TestEnricher();
+        }
+    }
+
+    private static class AndAnotherTestEnricher extends TestEnricher {
+        AndAnotherTestEnricher(String... outputKeys) {
+            super(outputKeys);
+        }
+    }
+
+    private static class AnotherTestEnricher extends TestEnricher {
+        AnotherTestEnricher(String... outputKeys) {
+            super(outputKeys);
+        }
+    }
+
+    private static class TestEnricher implements FailureEnricher {
+        private final Set<String> outputKeys;
+        private final Map<String, String> outputMap;
+
+        TestEnricher(String... outputKeys) {
+            this.outputKeys = 
Arrays.stream(outputKeys).collect(Collectors.toSet());
+            this.outputMap = new HashMap<>();
+            this.outputKeys.forEach(key -> outputMap.put(key, key + "Value"));
+        }
+
+        TestEnricher(Map<String, String> outputValues, String... outputKeys) {
+            this.outputKeys = 
Arrays.stream(outputKeys).collect(Collectors.toSet());
+            this.outputMap = outputValues;
+        }
+
+        @Override
+        public Set<String> getOutputKeys() {
+            return outputKeys;
+        }
+
+        @Override
+        public CompletableFuture<Map<String, String>> processFailure(
+                Throwable cause, Context context) {
+            return CompletableFuture.completedFuture(outputMap);
+        }
+    }
+}


Reply via email to