This is an automated email from the ASF dual-hosted git repository.

abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 51773abba1 [GOBBLIN-2211] Implement Error Classification based on 
execution issues (#4121)
51773abba1 is described below

commit 51773abba115e1bebec413662063286789c712db
Author: NamsB7 <[email protected]>
AuthorDate: Fri Jul 25 15:38:00 2025 +0530

    [GOBBLIN-2211] Implement Error Classification based on execution issues 
(#4121)
    
    Implemented Error Classification for jobs based on execution issues
    
    Added 2 new tables for ErrorPatternStore along with its in memory 
implementation
---
 .../gobblin/configuration/ConfigurationKeys.java   |   8 +
 .../gobblin/configuration/ErrorCategory.java       |  58 +--
 .../gobblin/configuration/ErrorPatternProfile.java |  55 +-
 .../apache/gobblin/service/ServiceConfigKeys.java  |  10 +
 .../gobblin/metastore/ErrorPatternStore.java       |  63 +++
 .../metastore/InMemoryErrorPatternStore.java       | 180 +++++++
 .../gobblin/metastore/MysqlErrorPatternStore.java  | 370 ++++++++++++++
 .../runtime/KafkaAvroJobStatusMonitorTest.java     | 138 ++++-
 .../apache/gobblin/runtime/ErrorClassifier.java    | 251 +++++++++
 .../troubleshooter/InMemoryIssueRepository.java    |  11 +-
 .../InMemoryMultiContextIssueRepository.java       |  20 +-
 .../runtime/troubleshooter/IssueRepository.java    |   3 +
 .../troubleshooter/IssueTestDataProvider.java      | 564 +++++++++++++++++++++
 .../troubleshooter/JobIssueEventHandler.java       |  38 +-
 .../MultiContextIssueRepository.java               |   6 +
 .../troubleshooter/NoopIssueRepository.java        |   6 +
 .../gobblin/runtime/ErrorClassifierTest.java       | 184 +++++++
 .../troubleshooter/JobIssueEventHandlerTest.java   |   4 +-
 .../modules/core/GobblinServiceGuiceModule.java    |  11 +-
 .../MySqlMultiContextIssueRepository.java          |  47 ++
 .../monitoring/KafkaAvroJobStatusMonitor.java      |   5 +-
 .../service/monitoring/KafkaJobStatusMonitor.java  |  59 ++-
 .../monitoring/KafkaJobStatusMonitorFactory.java   |   9 +-
 23 files changed, 1983 insertions(+), 117 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 03f1a2629e..195ca295ee 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -85,6 +85,14 @@ public class ConfigurationKeys {
 
   public static final String DATASETURN_STATESTORE_NAME_PARSER = 
"state.store.datasetUrnStateStoreNameParser";
 
+  /**
+   * Error Pattern Store related to configuration properties.
+   */
+  public static final String ERROR_REGEX_DB_TABLE_KEY = 
"state.store.db.regexTableName";
+  public static final String ERROR_CATEGORIES_DB_TABLE_KEY = 
"state.store.db.categoriesTableName";
+  public static final String ERROR_REGEX_VARCHAR_SIZE_KEY = 
"error.regex.max.varchar.size";
+  public static final String ERROR_CATEGORY_VARCHAR_SIZE_KEY = 
"error.category.max.varchar.size";
+  public static final String DEFAULT_DB_ERROR_CATEGORY_KEY = 
"error.category.default";
   /**
    * Job scheduler configuration properties.
    */
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/NoopIssueRepository.java
 b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ErrorCategory.java
similarity index 50%
copy from 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/NoopIssueRepository.java
copy to 
gobblin-api/src/main/java/org/apache/gobblin/configuration/ErrorCategory.java
index f47f9085fb..43b927242c 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/NoopIssueRepository.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ErrorCategory.java
@@ -15,47 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.runtime.troubleshooter;
+package org.apache.gobblin.configuration;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-
-public class NoopIssueRepository implements IssueRepository {
-  @Override
-  public List<Issue> getAll()
-      throws TroubleshooterException {
-    return Collections.emptyList();
-  }
-
-  @Override
-  public void put(Issue issue)
-      throws TroubleshooterException {
-
-  }
-
-  @Override
-  public void put(Collection<Issue> issues)
-      throws TroubleshooterException {
-
-  }
-
-  @Override
-  public void remove(String issueCode)
-      throws TroubleshooterException {
-
-  }
-
-  @Override
-  public void removeAll()
-      throws TroubleshooterException {
+import java.io.Serializable;
+import lombok.Getter;
+import lombok.Setter;
 
+/*
+ * Represents an error category used to classify errors within the Gobblin 
framework.
+ * Each category has a name and a priority, which can be used to determine the 
severity
+ * or importance of the error. This class is primarily used for error 
categorisation and handling.
+ */
+@Getter
+@Setter
+public class ErrorCategory implements Serializable {
+  private String categoryName;
+  private int priority;
+
+  public ErrorCategory(String categoryName, int priority) {
+    this.categoryName = categoryName;
+    this.priority = priority;
   }
 
-  @Override
-  public void replaceAll(Collection<Issue> issues)
-      throws TroubleshooterException {
-
-  }
 }
+
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/NoopIssueRepository.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ErrorPatternProfile.java
similarity index 50%
copy from 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/NoopIssueRepository.java
copy to 
gobblin-api/src/main/java/org/apache/gobblin/configuration/ErrorPatternProfile.java
index f47f9085fb..0b649b4ebf 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/NoopIssueRepository.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ErrorPatternProfile.java
@@ -15,47 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.runtime.troubleshooter;
+package org.apache.gobblin.configuration;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import java.io.Serializable;
 
+import lombok.Getter;
+import lombok.Setter;
 
-public class NoopIssueRepository implements IssueRepository {
-  @Override
-  public List<Issue> getAll()
-      throws TroubleshooterException {
-    return Collections.emptyList();
-  }
-
-  @Override
-  public void put(Issue issue)
-      throws TroubleshooterException {
-
-  }
-
-  @Override
-  public void put(Collection<Issue> issues)
-      throws TroubleshooterException {
-
-  }
-
-  @Override
-  public void remove(String issueCode)
-      throws TroubleshooterException {
-
-  }
-
-  @Override
-  public void removeAll()
-      throws TroubleshooterException {
-
-  }
-
-  @Override
-  public void replaceAll(Collection<Issue> issues)
-      throws TroubleshooterException {
-
+/*
+ * Represents a profile for error patterns, containing a regex to match error 
descriptions and a category name for classification.
+ */
+@Getter
+@Setter
+public class ErrorPatternProfile implements Serializable {
+  private String descriptionRegex;
+  private String categoryName;
+
+  public ErrorPatternProfile(String descriptionRegex, String categoryName) {
+    this.descriptionRegex = descriptionRegex;
+    this.categoryName = categoryName;
   }
 }
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 8b6c1df037..f18566c22f 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -160,4 +160,14 @@ public class ServiceConfigKeys {
   public static final String DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "nonRetryableExceptions";
   public static final Integer DEFAULT_NUM_DAG_PROC_THREADS = 3;
   public static final long DEFAULT_FLOW_FINISH_DEADLINE_MILLIS = 
TimeUnit.HOURS.toMillis(24);
+
+  public static final String ERROR_PATTERN_STORE_CLASS = 
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "errorPatternStore.class";
+  public static final String ERROR_CLASSIFICATION_ENABLED_KEY = 
"errorClassification.enabled";
+  public static final String ERROR_CLASSIFICATION_MAX_ERRORS_IN_FINAL_KEY = 
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX 
+"errorClassification.maxErrorsInFinal";
+  public static final int DEFAULT_ERROR_CLASSIFICATION_MAX_ERRORS_IN_FINAL = 
10;
+  public static final String ERROR_CLASSIFICATION_MAX_ERRORS_TO_PROCESS_KEY = 
GOBBLIN_SERVICE_PREFIX + "errorClassification.maxErrorsToProcess";
+  public static final int DEFAULT_ERROR_CLASSIFICATION_MAX_ERRORS_TO_PROCESS = 
1000;
+  public static final String ERROR_CLASSIFICATION_DEFAULT_PRIORITY_KEY = 
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + 
"errorClassification.defaultPriorityValue";
+  public static final int DEFAULT_PRIORITY_VALUE = Integer.MAX_VALUE;
+  public static final String ERROR_PATTERN_STORE_DEFAULT_CATEGORY_KEY = 
GOBBLIN_SERVICE_PREFIX + "errorPatternStore.defaultCategory";
 }
diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/ErrorPatternStore.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/ErrorPatternStore.java
new file mode 100644
index 0000000000..95e8fcdbda
--- /dev/null
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/ErrorPatternStore.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import org.apache.gobblin.configuration.ErrorCategory;
+import org.apache.gobblin.configuration.ErrorPatternProfile;
+
+import java.io.IOException;
+import java.util.List;
+
+
+/**
+ * Interface for a store that persists Errors and Categories.
+ **/
+public interface ErrorPatternStore {
+  void addErrorPattern(ErrorPatternProfile issue)
+      throws IOException;
+
+  boolean deleteErrorPattern(String descriptionRegex)
+      throws IOException;
+
+  ErrorPatternProfile getErrorPattern(String descriptionRegex)
+      throws IOException;
+
+  List<ErrorPatternProfile> getAllErrorPatterns()
+      throws IOException;
+
+  List<ErrorPatternProfile> getErrorPatternsByCategory(String categoryName)
+      throws IOException;
+
+  void addErrorCategory(ErrorCategory errorCategory)
+      throws IOException;
+
+  ErrorCategory getErrorCategory(String categoryName)
+      throws IOException;
+
+  int getErrorCategoryPriority(String categoryName)
+      throws IOException;
+
+  List<ErrorCategory> getAllErrorCategories()
+      throws IOException;
+
+  List<ErrorPatternProfile> getAllErrorPatternsOrderedByCategoryPriority()
+      throws IOException;
+
+  ErrorCategory getDefaultCategory()
+      throws IOException;
+}
diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/InMemoryErrorPatternStore.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/InMemoryErrorPatternStore.java
new file mode 100644
index 0000000000..9be6070e26
--- /dev/null
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/InMemoryErrorPatternStore.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ErrorCategory;
+import org.apache.gobblin.configuration.ErrorPatternProfile;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An in-memory implementation of the ErrorPatternStore interface.
+ * This class serves as a (default) base class for initialisation and does not 
persist data across application restarts.
+ **/
+@Slf4j
+public class InMemoryErrorPatternStore implements ErrorPatternStore {
+  private final List<ErrorPatternProfile> errorPatterns = new ArrayList<>();
+  private final Map<String, ErrorCategory> categories = new HashMap<>();
+  private ErrorCategory defaultErrorCategory = null;
+
+  private static final String DEFAULT_CATEGORY_NAME = "UNKNOWN";
+
+  private final int defaultPriority;
+
+  @Inject
+  public InMemoryErrorPatternStore(Config config) {
+    ErrorCategory user = new ErrorCategory("USER", 1);
+    this.categories.put(user.getCategoryName(), user);
+
+    this.errorPatterns.add(new ErrorPatternProfile(".*file not found.*", 
"USER"));
+    defaultPriority = ConfigUtils.getInt(config, 
ServiceConfigKeys.ERROR_CLASSIFICATION_DEFAULT_PRIORITY_KEY,
+        ServiceConfigKeys.DEFAULT_PRIORITY_VALUE);
+
+    String defaultErrorCategoryName =ConfigUtils.getString(config, 
ServiceConfigKeys.ERROR_PATTERN_STORE_DEFAULT_CATEGORY_KEY, 
DEFAULT_CATEGORY_NAME);
+
+    this.defaultErrorCategory = new ErrorCategory(defaultErrorCategoryName, 
defaultPriority);
+  }
+
+  public void upsertCategory(List<ErrorCategory> categories) {
+    for (ErrorCategory errorCategory : categories) {
+      this.categories.put(errorCategory.getCategoryName(), errorCategory);
+    }
+  }
+
+  public void upsertPatterns(List<ErrorPatternProfile> patterns) {
+    // Clear existing patterns and add all new ones
+    this.errorPatterns.clear();
+    this.errorPatterns.addAll(patterns);
+  }
+
+  public void setDefaultCategory(ErrorCategory errorCategory) {
+    this.defaultErrorCategory = errorCategory;
+  }
+
+  @Override
+  public void addErrorPattern(ErrorPatternProfile issue)
+      throws IOException {
+    errorPatterns.add(issue);
+  }
+
+  @Override
+  public boolean deleteErrorPattern(String descriptionRegex)
+      throws IOException {
+    return errorPatterns.removeIf(issue -> 
issue.getDescriptionRegex().equals(descriptionRegex));
+  }
+
+  @Override
+  public ErrorPatternProfile getErrorPattern(String descriptionRegex)
+      throws IOException {
+    for (ErrorPatternProfile issue : errorPatterns) {
+      if (issue.getDescriptionRegex().equals(descriptionRegex)) {
+        return issue;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public List<ErrorPatternProfile> getAllErrorPatterns()
+      throws IOException {
+    return new ArrayList<>(errorPatterns);
+  }
+
+  @Override
+  public List<ErrorPatternProfile> getErrorPatternsByCategory(String 
categoryName)
+      throws IOException {
+    List<ErrorPatternProfile> result = new ArrayList<>();
+    for (ErrorPatternProfile issue : errorPatterns) {
+      if (issue.getCategoryName() != null && 
issue.getCategoryName().equals(categoryName)) {
+        result.add(issue);
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public void addErrorCategory(ErrorCategory errorCategory)
+      throws IOException {
+    if (errorCategory != null) {
+      categories.put(errorCategory.getCategoryName(), errorCategory);
+    }
+  }
+
+  @Override
+  public ErrorCategory getErrorCategory(String categoryName)
+      throws IOException {
+    return categories.get(categoryName);
+  }
+
+  @Override
+  public int getErrorCategoryPriority(String categoryName)
+      throws IOException {
+    ErrorCategory errorCategory = getErrorCategory(categoryName);
+    if (errorCategory != null) {
+      return errorCategory.getPriority();
+    }
+    throw new IOException("ErrorCategory not found: " + categoryName);
+  }
+
+  @Override
+  public List<ErrorCategory> getAllErrorCategories()
+      throws IOException {
+    return new ArrayList<>(categories.values());
+  }
+
+  @Override
+  public ErrorCategory getDefaultCategory()
+      throws IOException {
+    if (defaultErrorCategory == null) {
+      defaultErrorCategory = new ErrorCategory(DEFAULT_CATEGORY_NAME, 
defaultPriority);
+    }
+    return defaultErrorCategory;
+  }
+
+  @Override
+  public List<ErrorPatternProfile> 
getAllErrorPatternsOrderedByCategoryPriority()
+      throws IOException {
+    errorPatterns.sort((issue1, issue2) -> {
+      ErrorCategory cat1 = categories.get(issue1.getCategoryName());
+      ErrorCategory cat2 = categories.get(issue2.getCategoryName());
+      if (cat1 == null && cat2 == null) {
+        return 0;
+      }
+      if (cat1 == null) {
+        return 1;
+      }
+      if (cat2 == null) {
+        return -1;
+      }
+      return Integer.compare(cat1.getPriority(), cat2.getPriority());
+    });
+    return errorPatterns;
+  }
+}
diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlErrorPatternStore.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlErrorPatternStore.java
new file mode 100644
index 0000000000..e63d419d2c
--- /dev/null
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlErrorPatternStore.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.ErrorPatternProfile;
+import org.apache.gobblin.configuration.ErrorCategory;
+
+import com.typesafe.config.Config;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.gobblin.util.ConfigUtils;
+
+import javax.sql.DataSource;
+
+import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * MySQL-backed implementation of ErrorPatternStore.
+ *
+ * This class provides methods to primarily retrieve error regex patterns and 
error categories.
+ * There are also methods to add and delete, which should be used with 
caution, and retrieve error patterns and categories.
+ *
+ * Expected table schemas:
+ *
+ * 1. error_summary_regex_store
+ *    - description_regex: VARCHAR(255) NOT NULL UNIQUE
+ *    - error_category_name: VARCHAR(255) NOT NULL
+ *
+ * 2. error_categories
+ *    - error_category_name: VARCHAR(255) PRIMARY KEY
+ *    - priority: INT UNIQUE NOT NULL
+ *
+ * Default Category Configuration:
+ * - The default category can be configured via the property: 
gobblin.service.errorPatternStore.defaultCategory
+ * - The configured value must exactly match an existing error_category_name 
in the error_categories table
+ * - If no default is configured, the method returns null (no automatic 
fallback)
+ * - If a configured default category is not found in the database, an 
IOException will be thrown
+ *
+ **/
+@Slf4j
+public class MysqlErrorPatternStore implements ErrorPatternStore {
+  private final DataSource dataSource;
+  private final String errorRegexSummaryStoreTable;
+  private final String errorCategoriesTable;
+  public static final String CONFIG_PREFIX = "MysqlErrorPatternStore";
+
+  private static final int DEFAULT_MAX_CHARACTERS_IN_SQL_DESCRIPTION_REGEX = 
300;
+  private static final int DEFAULT_MAX_CHARACTERS_IN_SQL_CATEGORY_NAME = 255;
+  private final int maxCharactersInSqlDescriptionRegex;
+  private final int maxCharactersInSqlCategoryName;
+
+  private static final String CREATE_ERROR_REGEX_SUMMARY_STORE_TABLE_STATEMENT 
=
+      "CREATE TABLE IF NOT EXISTS %s (description_regex VARCHAR(%d) NOT NULL, 
error_category_name VARCHAR(%d) NOT NULL)";
+
+  private static final String CREATE_ERROR_CATEGORIES_TABLE_NAME =
+      "CREATE TABLE IF NOT EXISTS %s (error_category_name VARCHAR(%d) PRIMARY 
KEY, priority INT UNIQUE NOT NULL)";
+
+  private static final String INSERT_ERROR_CATEGORY_STATEMENT = "INSERT INTO 
%s (error_category_name, priority) "
+      + "VALUES (?, ?) ON DUPLICATE KEY UPDATE priority=VALUES(priority)";
+
+  private static final String GET_ERROR_CATEGORY_STATEMENT =
+      "SELECT error_category_name, priority FROM %s WHERE error_category_name 
= ?";
+
+  private static final String GET_ALL_ERROR_CATEGORIES_STATEMENT =
+      "SELECT error_category_name, priority FROM %s ORDER BY priority ASC";
+
+  private static final String INSERT_ERROR_REGEX_SUMMARY_STATEMENT =
+      "INSERT INTO %s (description_regex, error_category_name) "
+          + "VALUES (?, ?) ON DUPLICATE KEY UPDATE 
error_category_name=VALUES(error_category_name)";
+
+  private static final String DELETE_ERROR_REGEX_SUMMARY_STATEMENT = "DELETE 
FROM %s WHERE description_regex = ?";
+
+  private static final String GET_ERROR_REGEX_SUMMARY_STATEMENT =
+      "SELECT description_regex, error_category_name FROM %s WHERE 
description_regex   = ?";
+
+  private static final String GET_ERROR_PATTERN_BY_CATEGORY_STATEMENT = 
"SELECT description_regex, error_category_name FROM %s "
+      + " WHERE error_category_name = ?";
+
+  private static final String GET_ALL_ERROR_REGEX_SUMMARIES_STATEMENT =
+      "SELECT description_regex, error_category_name FROM %s";
+
+  // This SQL statement retrieves the category with the lowest priority 
(highest priority value).
+  private static final String GET_LOWEST_PRIORITY_ERROR_CATEGORY_STATEMENT =
+      "SELECT error_category_name, priority FROM %s ORDER BY priority DESC 
LIMIT 1";
+
+  private static final String 
GET_ALL_ERROR_ISSUES_ORDERED_BY_CATEGORY_PRIORITY_STATEMENT =
+      "SELECT e.description_regex, e.error_category_name FROM %s e "
+          + "JOIN %s c ON e.error_category_name = c.error_category_name "
+          + "ORDER BY c.priority ASC";
+
+  private final String configuredDefaultCategoryName;
+
+  @Inject
+  public MysqlErrorPatternStore(Config config)
+      throws IOException {
+    if (config.hasPath(CONFIG_PREFIX)) {
+      config = config.getConfig(CONFIG_PREFIX).withFallback(config);
+    } else {
+      throw new IOException("Please specify the config for 
MysqlErrorPatternStore");
+    }
+    this.errorRegexSummaryStoreTable =
+        ConfigUtils.getString(config, 
ConfigurationKeys.ERROR_REGEX_DB_TABLE_KEY, "error_summary_regex_store");
+    this.errorCategoriesTable =
+        ConfigUtils.getString(config, 
ConfigurationKeys.ERROR_CATEGORIES_DB_TABLE_KEY, "error_categories");
+    this.dataSource = MysqlDataSourceFactory.get(config, 
SharedResourcesBrokerFactory.getImplicitBroker());
+
+    this.maxCharactersInSqlDescriptionRegex = ConfigUtils.getInt(config, 
ConfigurationKeys.ERROR_REGEX_VARCHAR_SIZE_KEY,
+        DEFAULT_MAX_CHARACTERS_IN_SQL_DESCRIPTION_REGEX);
+
+    this.maxCharactersInSqlCategoryName = ConfigUtils.getInt(config, 
ConfigurationKeys.ERROR_CATEGORY_VARCHAR_SIZE_KEY,
+        DEFAULT_MAX_CHARACTERS_IN_SQL_CATEGORY_NAME);
+
+    // Use ServiceConfigKeys for the default category config key
+    this.configuredDefaultCategoryName =
+        ConfigUtils.getString(config, 
ConfigurationKeys.DEFAULT_DB_ERROR_CATEGORY_KEY, null);
+
+    createTablesIfNotExist();
+  }
+
+  private void createTablesIfNotExist()
+      throws IOException {
+    try (Connection connection = dataSource.getConnection()) {
+      try (PreparedStatement createRegexTable = connection.prepareStatement(
+          String.format(CREATE_ERROR_REGEX_SUMMARY_STORE_TABLE_STATEMENT, 
errorRegexSummaryStoreTable,
+              maxCharactersInSqlDescriptionRegex, 
maxCharactersInSqlCategoryName))) {
+        createRegexTable.executeUpdate();
+      }
+
+      // Create error_categories table
+      try (PreparedStatement createCategoriesTable = 
connection.prepareStatement(
+          String.format(CREATE_ERROR_CATEGORIES_TABLE_NAME, 
errorCategoriesTable, maxCharactersInSqlCategoryName))) {
+        createCategoriesTable.executeUpdate();
+      }
+
+      // Commit both changes together
+      connection.commit();
+    } catch (SQLException e) {
+      throw new IOException("Failed to create tables for storing 
ErrorPatterns", e);
+    }
+  }
+
+  @Override
+  public void addErrorCategory(ErrorCategory errorCategory)
+      throws IOException {
+    String sql = String.format(INSERT_ERROR_CATEGORY_STATEMENT, 
errorCategoriesTable);
+    try (Connection conn = dataSource.getConnection(); PreparedStatement ps = 
conn.prepareStatement(sql)) {
+      ps.setString(1, errorCategory.getCategoryName());
+      ps.setInt(2, errorCategory.getPriority());
+      ps.executeUpdate();
+      conn.commit();
+    } catch (SQLException e) {
+      throw new IOException("Failed to add errorCategory", e);
+    }
+  }
+
+  @Override
+  public ErrorCategory getErrorCategory(String categoryName)
+      throws IOException {
+    try (Connection conn = dataSource.getConnection(); PreparedStatement ps = 
conn.prepareStatement(
+        String.format(GET_ERROR_CATEGORY_STATEMENT, errorCategoriesTable))) {
+      ps.setString(1, categoryName);
+      try (ResultSet rs = ps.executeQuery()) {
+        if (rs.next()) {
+          return new ErrorCategory(rs.getString(1), rs.getInt(2));
+        }
+      }
+    } catch (SQLException e) {
+      throw new IOException("Failed to get category", e);
+    }
+    return null;
+  }
+
+  @Override
+  public int getErrorCategoryPriority(String categoryName)
+      throws IOException {
+    ErrorCategory cat = getErrorCategory(categoryName);
+    if (cat == null) {
+      throw new IOException("ErrorCategory not found: " + categoryName);
+    }
+    return cat.getPriority();
+  }
+
+  @Override
+  public List<ErrorCategory> getAllErrorCategories()
+      throws IOException {
+    List<ErrorCategory> categories = new ArrayList<>();
+    try (Connection conn = dataSource.getConnection(); PreparedStatement ps = 
conn.prepareStatement(
+        String.format(GET_ALL_ERROR_CATEGORIES_STATEMENT, 
errorCategoriesTable)); ResultSet rs = ps.executeQuery()) {
+      while (rs.next()) {
+        categories.add(new ErrorCategory(rs.getString(1), rs.getInt(2)));
+      }
+    } catch (SQLException e) {
+      throw new IOException("Failed to get all categories", e);
+    }
+    return categories;
+  }
+
+  @Override
+  public void addErrorPattern(ErrorPatternProfile issue)
+      throws IOException {
+    try (Connection conn = dataSource.getConnection(); PreparedStatement ps = 
conn.prepareStatement(
+        String.format(INSERT_ERROR_REGEX_SUMMARY_STATEMENT, 
errorRegexSummaryStoreTable))) {
+      ps.setString(1, issue.getDescriptionRegex());
+      ps.setString(2, issue.getCategoryName());
+      ps.executeUpdate();
+      conn.commit();
+    } catch (SQLException e) {
+      throw new IOException("Failed to add issue", e);
+    }
+  }
+
+  @Override
+  public boolean deleteErrorPattern(String descriptionRegex)
+      throws IOException {
+    try (Connection conn = dataSource.getConnection(); PreparedStatement ps = 
conn.prepareStatement(
+        String.format(DELETE_ERROR_REGEX_SUMMARY_STATEMENT, 
errorRegexSummaryStoreTable))) {
+      ps.setString(1, descriptionRegex);
+      int rows = ps.executeUpdate();
+      conn.commit();
+      return rows > 0;
+    } catch (SQLException e) {
+      throw new IOException("Failed to delete issue", e);
+    }
+  }
+
+  @Override
+  public ErrorPatternProfile getErrorPattern(String descriptionRegex)
+      throws IOException {
+    try (Connection conn = dataSource.getConnection(); PreparedStatement ps = 
conn.prepareStatement(
+        String.format(GET_ERROR_REGEX_SUMMARY_STATEMENT, 
errorRegexSummaryStoreTable))) {
+      ps.setString(1, descriptionRegex);
+      try (ResultSet rs = ps.executeQuery()) {
+        if (rs.next()) {
+          return new ErrorPatternProfile(rs.getString(1), rs.getString(2));
+        }
+      }
+    } catch (SQLException e) {
+      throw new IOException("Failed to get issue", e);
+    }
+    return null;
+  }
+
+  @Override
+  public List<ErrorPatternProfile> getAllErrorPatterns()
+      throws IOException {
+    List<ErrorPatternProfile> issues = new ArrayList<>();
+    try (Connection conn = dataSource.getConnection(); PreparedStatement ps = 
conn.prepareStatement(
+        String.format(GET_ALL_ERROR_REGEX_SUMMARIES_STATEMENT, 
errorRegexSummaryStoreTable));
+        ResultSet rs = ps.executeQuery()) {
+      while (rs.next()) {
+        issues.add(new ErrorPatternProfile(rs.getString(1), rs.getString(2)));
+      }
+    } catch (SQLException e) {
+      throw new IOException("Failed to get all issues", e);
+    }
+    return issues;
+  }
+
+  @Override
+  public List<ErrorPatternProfile> getErrorPatternsByCategory(String 
categoryName)
+      throws IOException {
+    List<ErrorPatternProfile> issues = new ArrayList<>();
+    try (Connection conn = dataSource.getConnection(); PreparedStatement ps = 
conn.prepareStatement(
+        String.format(GET_ERROR_PATTERN_BY_CATEGORY_STATEMENT, 
errorRegexSummaryStoreTable)
+    )) {
+      ps.setString(1, categoryName);
+      try (ResultSet rs = ps.executeQuery()) {
+        while (rs.next()) {
+          issues.add(new ErrorPatternProfile(rs.getString(1), 
rs.getString(2)));
+        }
+      }
+    } catch (SQLException e) {
+      throw new IOException("Failed to get issues by category", e);
+    }
+    return issues;
+  }
+
+  @Override
+  public ErrorCategory getDefaultCategory()
+      throws IOException {
+    // 1. Try to use the configured default category name if set
+    if (StringUtils.isNotBlank(configuredDefaultCategoryName)) {
+      ErrorCategory cat = getErrorCategory(configuredDefaultCategoryName);
+      if (cat != null) {
+        return cat;
+      } else {
+        // Throw exception if configured category doesn't exist
+        throw new IOException(String.format(
+            "Configured default category '%s' not found in database",
+            configuredDefaultCategoryName));
+      }
+    }
+
+    // 2. No configuration provided - use lowest priority category as fallback
+    ErrorCategory lowestPriorityCategory = getLowestPriorityCategory();
+    if (lowestPriorityCategory == null) {
+      throw new IOException("No error categories found in database");
+    }
+
+    log.info("No default category configured, using lowest priority category: 
{}",
+        lowestPriorityCategory.getCategoryName());
+    return lowestPriorityCategory;
+  }
+
+  /**
+   * Returns the category with the lowest priority, i.e. the highest priority 
value (descending order).
+   */
+  private ErrorCategory getLowestPriorityCategory()
+      throws IOException {
+    try (Connection conn = dataSource.getConnection(); PreparedStatement ps = 
conn.prepareStatement(
+        String.format(GET_LOWEST_PRIORITY_ERROR_CATEGORY_STATEMENT, 
errorCategoriesTable))) {
+      try (ResultSet rs = ps.executeQuery()) {
+        if (rs.next()) {
+          return new ErrorCategory(rs.getString(1), rs.getInt(2));
+        }
+      }
+    } catch (SQLException e) {
+      throw new IOException("Failed to get category", e);
+    }
+    return null;
+  }
+
+  /**
+   * Returns all ErrorIssues ordered by the priority of their category 
(ascending), then by description_regex.
+   */
+  @Override
+  public List<ErrorPatternProfile> 
getAllErrorPatternsOrderedByCategoryPriority()
+      throws IOException {
+    List<ErrorPatternProfile> issues = new ArrayList<>();
+    String sql = 
String.format(GET_ALL_ERROR_ISSUES_ORDERED_BY_CATEGORY_PRIORITY_STATEMENT, 
errorRegexSummaryStoreTable,
+        errorCategoriesTable);
+    try (Connection conn = dataSource.getConnection(); PreparedStatement ps = 
conn.prepareStatement(sql);
+        ResultSet rs = ps.executeQuery()) {
+      while (rs.next()) {
+        issues.add(new ErrorPatternProfile(rs.getString(1), rs.getString(2)));
+      }
+    } catch (SQLException e) {
+      throw new IOException("Failed to get all issues ordered by category 
priority", e);
+    }
+    return issues;
+  }
+}
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index 06fb1f9dbc..ebdbbb73ca 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -34,10 +34,12 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.mockito.Mockito;
+import static org.mockito.ArgumentMatchers.any;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Optional;
@@ -55,10 +57,12 @@ import kafka.message.MessageAndMetadata;
 import lombok.Getter;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.ErrorPatternProfile;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.kafka.KafkaTestBase;
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
+import org.apache.gobblin.metastore.InMemoryErrorPatternStore;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metrics.FlowStatus;
 import org.apache.gobblin.metrics.GaaSFlowObservabilityEvent;
@@ -72,9 +76,12 @@ import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
 import org.apache.gobblin.metrics.kafka.KafkaKeyValueProducerPusher;
 import org.apache.gobblin.metrics.kafka.Pusher;
 import 
org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.IssueTestDataProvider;
 import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.monitoring.GaaSJobObservabilityEventProducer;
@@ -86,7 +93,6 @@ import 
org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProduc
 import org.apache.gobblin.util.ConfigUtils;
 
 import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_MULTIPLIER;
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
@@ -105,6 +111,8 @@ public class KafkaAvroJobStatusMonitorTest {
   private MetricContext context;
   private KafkaAvroEventKeyValueReporter.Builder<?> builder;
   DagActionStore.DagAction enforceJobStartDeadlineDagAction;
+  private JobIssueEventHandler eventHandler;
+  private ErrorClassifier errorClassifier;
 
   @BeforeClass
   public void setUp() throws Exception {
@@ -128,6 +136,12 @@ public class KafkaAvroJobStatusMonitorTest {
         DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
   }
 
+  @BeforeMethod
+  public void setUpMethod() {
+    eventHandler = Mockito.mock(JobIssueEventHandler.class);
+    errorClassifier = Mockito.mock(ErrorClassifier.class);
+  }
+
   @Test
   public void testProcessMessageForSuccessfulFlow() throws IOException, 
ReflectiveOperationException {
     DagManagementStateStore dagManagementStateStore = 
mock(DagManagementStateStore.class);
@@ -152,7 +166,7 @@ public class KafkaAvroJobStatusMonitorTest {
       Thread.currentThread().interrupt();
     }
     MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
-      new NoopGaaSJobObservabilityEventProducer(), dagManagementStateStore);
+        new NoopGaaSJobObservabilityEventProducer(), dagManagementStateStore);
     jobStatusMonitor.buildMetricsContextAndMetrics();
 
     Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator = 
Iterators.transform(
@@ -194,6 +208,115 @@ public class KafkaAvroJobStatusMonitorTest {
     jobStatusMonitor.shutDown();
   }
 
+  /**
+   * Functional test for ErrorClassifier integration with 
KafkaJobStatusMonitor:
+   * Tests that when a job fails, the monitor properly invokes ErrorClassifier 
to classify issues
+   * and logs the final classified error through the JobIssueEventHandler.
+   */
+  @Test (dependsOnMethods = "testObservabilityEventFlowFailed")
+  public void testErrorClassifierFinalIssueSystemInfra()
+      throws IOException, ReflectiveOperationException, 
TroubleshooterException {
+    DagManagementStateStore dagManagementStateStore = 
mock(DagManagementStateStore.class);
+    Config config = 
ConfigFactory.empty().withValue(ServiceConfigKeys.ERROR_CLASSIFICATION_ENABLED_KEY,
 ConfigValueFactory.fromAnyRef(Boolean.TRUE));
+    List<ErrorPatternProfile> errorPatternProfiles = 
IssueTestDataProvider.getSortedPatterns();
+
+    InMemoryErrorPatternStore store = new InMemoryErrorPatternStore(config);
+    store.upsertCategory(IssueTestDataProvider.TEST_CATEGORIES);
+    store.upsertPatterns(errorPatternProfiles);
+    
store.setDefaultCategory(IssueTestDataProvider.TEST_DEFAULT_ERROR_CATEGORY);
+
+    this.errorClassifier =  new ErrorClassifier(store, config);
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic2");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createJobFailedEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+
+    //Set issues for error classification
+    
Mockito.doReturn(IssueTestDataProvider.testSystemInfraCategoryIssues()).when(eventHandler).getErrorListForClassification(Mockito.any());
+
+    try {
+      Thread.sleep(1000L);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
+        config, new NoopGaaSJobObservabilityEventProducer(), 
dagManagementStateStore);
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+
+    ConsumerIterator<byte[], byte[]> iterator = 
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
+    MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
+    
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
+
+    // Verify that the JobIssueEventHandler was called with the final 
classified error
+    Mockito.verify(eventHandler, Mockito.times(1))
+        .logFinalError(Mockito.argThat(issue ->
+                issue.getSummary().contains("SYSTEM INFRA")),
+            Mockito.eq(this.flowName),
+            Mockito.eq(this.flowGroup),
+            Mockito.eq(String.valueOf(this.flowExecutionId)),
+            Mockito.eq(this.jobName));
+
+    jobStatusMonitor.shutDown();
+  }
+
+  /**
+   * Test for verifying that error classifier module is not called when it is 
disabled.
+   */
+  @Test (dependsOnMethods = "testErrorClassifierFinalIssueSystemInfra")
+  public void testDisabledErrorClassification()
+      throws IOException, ReflectiveOperationException, 
TroubleshooterException {
+    DagManagementStateStore dagManagementStateStore = 
mock(DagManagementStateStore.class);
+    Config config = 
ConfigFactory.empty().withValue(ServiceConfigKeys.ERROR_CLASSIFICATION_ENABLED_KEY,
 ConfigValueFactory.fromAnyRef(Boolean.FALSE));
+    List<ErrorPatternProfile> errorPatternProfiles = 
IssueTestDataProvider.getSortedPatterns();
+
+    InMemoryErrorPatternStore store = new InMemoryErrorPatternStore(config);
+    store.upsertCategory(IssueTestDataProvider.TEST_CATEGORIES);
+    store.upsertPatterns(errorPatternProfiles);
+    
store.setDefaultCategory(IssueTestDataProvider.TEST_DEFAULT_ERROR_CATEGORY);
+
+    this.errorClassifier =  new ErrorClassifier(store, config);
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic2");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createJobFailedEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+
+    //Set issues for error classification
+    
Mockito.doReturn(IssueTestDataProvider.testSystemInfraCategoryIssues()).when(eventHandler).getErrorListForClassification(Mockito.any());
+
+    try {
+      Thread.sleep(1000L);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
+        config, new NoopGaaSJobObservabilityEventProducer(), 
dagManagementStateStore);
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+
+    ConsumerIterator<byte[], byte[]> iterator = 
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
+    MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
+    
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
+
+    // Verify that the JobIssueEventHandler was not called for error 
classification
+    Mockito.verify(eventHandler, Mockito.times(0))
+        .logFinalError(Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.any(), Mockito.any());
+    Mockito.verify(eventHandler, Mockito.times(0))
+        .getErrorListForClassification(Mockito.any());
+
+    jobStatusMonitor.shutDown();
+  }
+
   @Test (dependsOnMethods = "testProcessMessageForSuccessfulFlow")
   public void testProcessMessageForFailedFlow() throws IOException, 
ReflectiveOperationException {
     DagManagementStateStore dagManagementStateStore = 
mock(DagManagementStateStore.class);
@@ -309,7 +432,7 @@ public class KafkaAvroJobStatusMonitorTest {
         createFlowCompiledEvent(),
         createJobOrchestratedEvent(1, 2),
         createJobSkippedEvent()
-        ).forEach(event -> {
+    ).forEach(event -> {
       context.submitEvent(event);
       kafkaReporter.report();
     });
@@ -950,14 +1073,15 @@ public class KafkaAvroJobStatusMonitorTest {
   }
 
   MockKafkaAvroJobStatusMonitor 
createMockKafkaAvroJobStatusMonitor(AtomicBoolean 
shouldThrowFakeExceptionInParseJobStatusToggle, Config additionalConfig,
-      GaaSJobObservabilityEventProducer eventProducer, DagManagementStateStore 
dagManagementStateStore) throws IOException, ReflectiveOperationException {
+      GaaSJobObservabilityEventProducer eventProducer, DagManagementStateStore 
dagManagementStateStore)
+      throws IOException, ReflectiveOperationException {
     Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, 
ConfigValueFactory.fromAnyRef("localhost:0000"))
         
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, 
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
         .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
ConfigValueFactory.fromAnyRef(stateStoreDir))
         .withValue("zookeeper.connect", 
ConfigValueFactory.fromAnyRef("localhost:2121"))
         .withFallback(additionalConfig);
     return new MockKafkaAvroJobStatusMonitor("test", config, 1, 
shouldThrowFakeExceptionInParseJobStatusToggle,
-        eventProducer, dagManagementStateStore);
+        eventProducer, dagManagementStateStore, errorClassifier, eventHandler);
   }
   /**
    *   Create a dummy event to test if it is filtered out by the consumer.
@@ -1013,9 +1137,9 @@ public class KafkaAvroJobStatusMonitorTest {
      */
     public MockKafkaAvroJobStatusMonitor(String topic, Config config, int 
numThreads,
         AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle, 
GaaSJobObservabilityEventProducer producer,
-        DagManagementStateStore dagManagementStateStore)
+        DagManagementStateStore dagManagementStateStore, ErrorClassifier 
errorClassifier, JobIssueEventHandler eventHandler)
         throws IOException, ReflectiveOperationException {
-      super(topic, config, numThreads, mock(JobIssueEventHandler.class), 
producer, dagManagementStateStore);
+      super(topic, config, numThreads, eventHandler, producer, 
dagManagementStateStore, errorClassifier);
       shouldThrowFakeExceptionInParseJobStatus = 
shouldThrowFakeExceptionInParseJobStatusToggle;
     }
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ErrorClassifier.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ErrorClassifier.java
new file mode 100644
index 0000000000..d0f5fde65c
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ErrorClassifier.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import java.io.IOException;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ErrorCategory;
+import org.apache.gobblin.configuration.ErrorPatternProfile;
+import org.apache.gobblin.metastore.ErrorPatternStore;
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Classifies issues by matching their summary description to error patterns 
and categories.
+ * Categorisation is based on regex patterns and their associated categories.
+ * Each category has an associated priority value.
+ */
+@Slf4j
+public class ErrorClassifier {
+  private final List<CompiledErrorPattern> errorIssues;
+  private final Map<String, ErrorCategory> categoryMap;
+  private ErrorPatternStore errorStore = null;
+
+  private final int maxErrorsInFinalIssue;
+  private static final String FINAL_ISSUE_ERROR_CODE = "T0000";
+  private ErrorCategory defaultErrorCategory = null;
+
+  /**
+   * Loads all error issues and categories from the store into memory.
+   */
+  @Inject
+  public ErrorClassifier(ErrorPatternStore store, Config config)
+      throws IOException {
+    this.errorStore = store;
+
+    this.maxErrorsInFinalIssue =
+        ConfigUtils.getInt(config, 
ServiceConfigKeys.ERROR_CLASSIFICATION_MAX_ERRORS_IN_FINAL_KEY,
+            
ServiceConfigKeys.DEFAULT_ERROR_CLASSIFICATION_MAX_ERRORS_IN_FINAL);
+
+    this.categoryMap = new HashMap<>();
+    for (ErrorCategory cat : this.errorStore.getAllErrorCategories()) {
+      categoryMap.put(cat.getCategoryName(), cat);
+    }
+
+    this.errorIssues = new ArrayList<>();
+    for (ErrorPatternProfile errorPattern : 
this.errorStore.getAllErrorPatternsOrderedByCategoryPriority()) {
+      errorIssues.add(new CompiledErrorPattern(errorPattern));
+    }
+
+    try {
+      this.defaultErrorCategory = this.errorStore.getDefaultCategory();
+    }
+    catch (IOException e) {
+      log.warn("Failed to load default error category from store, will not use 
default category for classification", e);
+      this.defaultErrorCategory = null;
+    }
+  }
+
+
+  /**
+   * Classifies a list of issues and returns the highest priority category 
with its matched issues.
+   * If no issues match, returns null.
+   * If defaultErrorCategory is set, it will be used for unmatched issues.
+   */
+  public Issue classifyEarlyStopWithDefault(List<Issue> issues) {
+    if (issues == null || issues.isEmpty()) {
+      return null;
+    }
+
+    ClassificationResult result = performClassification(issues);
+
+    if (result.highestCategoryName == null) {
+      return null;
+    }
+
+    return buildFinalIssue(result.highestCategoryName, 
result.categoryToIssues);
+  }
+
+  private ClassificationResult performClassification(List<Issue> issues) {
+    ClassificationResult result = new ClassificationResult();
+
+    for (Issue issue : issues) {
+      classifySingleIssue(issue, result);
+    }
+
+    applyDefaultCategoryIfNeeded(result);
+    return result;
+  }
+
+  private void classifySingleIssue(Issue issue, ClassificationResult result) {
+    ErrorCategory matchedErrorCategory = findBestMatchingCategory(issue, 
result.highestPriority);
+
+    if (matchedErrorCategory != null) {
+      addMatchedIssue(issue, matchedErrorCategory, result);
+    } else {
+      addUnmatchedIssue(issue, result);
+    }
+  }
+
+  private ErrorCategory findBestMatchingCategory(Issue issue, Integer 
currentHighestPriority) {
+    for (CompiledErrorPattern pei : errorIssues) {
+      ErrorCategory cat = categoryMap.get(pei.getCategoryName());
+      if (cat == null) {
+        continue;
+      }
+
+      // Early stop optimization - skip categories with lower priority
+      if (currentHighestPriority != null && cat.getPriority() > 
currentHighestPriority) {
+        break;
+      }
+
+      if (pei.matches(issue.getSummary())) {
+        return cat;
+      }
+    }
+    return null;
+  }
+
+  private void addMatchedIssue(Issue issue, ErrorCategory errorCategory, 
ClassificationResult result) {
+    result.categoryToIssues.computeIfAbsent(errorCategory.getCategoryName(), k 
-> new ArrayList<>()).add(issue);
+
+    updateHighestPriorityIfNeeded(errorCategory, result);
+  }
+
+  private void addUnmatchedIssue(Issue issue, ClassificationResult result) {
+    result.unmatched.add(issue);
+
+    // Initialize default priority only once when we encounter the first 
unmatched issue
+    if (result.defaultPriority == null && defaultErrorCategory != null) {
+      result.defaultPriority = defaultErrorCategory.getPriority();
+      // Only update highest priority if no category has been matched yet OR 
if default category has higher priority (lower number)
+      if (result.highestPriority == null || defaultErrorCategory.getPriority() 
< result.highestPriority) {
+        result.highestPriority = result.defaultPriority;
+        result.highestCategoryName = defaultErrorCategory.getCategoryName();
+      }
+    }
+  }
+
+  private void updateHighestPriorityIfNeeded(ErrorCategory errorCategory, 
ClassificationResult result) {
+    if (result.highestPriority == null || errorCategory.getPriority() < 
result.highestPriority) {
+      result.highestPriority = errorCategory.getPriority();
+      result.highestCategoryName = errorCategory.getCategoryName();
+    }
+  }
+
+  private void applyDefaultCategoryIfNeeded(ClassificationResult result) {
+    boolean shouldUseDefault = result.highestPriority != null && 
result.highestPriority.equals(result.defaultPriority)
+        && !result.unmatched.isEmpty() && defaultErrorCategory != null;
+
+    if (shouldUseDefault) {
+      result.highestCategoryName = defaultErrorCategory.getCategoryName();
+
+      for (Issue issue : result.unmatched) {
+        
result.categoryToIssues.computeIfAbsent(defaultErrorCategory.getCategoryName(), 
k -> new ArrayList<>())
+            .add(issue);
+      }
+    }
+  }
+
+  private Issue buildFinalIssue(String categoryName, Map<String, List<Issue>> 
categoryToIssues) {
+    List<Issue> matchedIssues = categoryToIssues.get(categoryName);
+    String details = buildDetailsString(matchedIssues);
+
+    return Issue.builder().summary("ErrorCategory: " + 
categoryName).details(details).severity(IssueSeverity.ERROR)
+        
.time(ZonedDateTime.now()).code(FINAL_ISSUE_ERROR_CODE).sourceClass(null).exceptionClass(null).properties(null)
+        .build();
+  }
+
+  private String buildDetailsString(List<Issue> issues) {
+    List<String> summaries = new ArrayList<>();
+    int limit = Math.min(maxErrorsInFinalIssue, issues.size());
+
+    for (int i = 0; i < limit; i++) {
+      summaries.add(issues.get(i).getSummary());
+    }
+
+    return String.join(" \n ", summaries);
+  }
+
+  /**
+   * Helper class that stores the result of issue classification, including 
matched categories, unmatched issues, and priority information.
+   */
+  private static class ClassificationResult {
+    Map<String, List<Issue>> categoryToIssues = new HashMap<>();
+    List<Issue> unmatched = new ArrayList<>();
+    Integer highestPriority = null;
+    String highestCategoryName = null;
+    Integer defaultPriority = null;
+  }
+
+  /**
+   * Helper class to store compiled regex for fast matching.
+   */
+  private static class CompiledErrorPattern {
+    private final ErrorPatternProfile issue;
+    private final Pattern pattern;
+
+    CompiledErrorPattern(ErrorPatternProfile issue) {
+      this.issue = issue;
+      try {
+        this.pattern = Pattern.compile(issue.getDescriptionRegex(), 
Pattern.CASE_INSENSITIVE);
+      } catch (PatternSyntaxException e) {
+        log.error("Invalid regex pattern for issue: {}. Error: {}", 
issue.getDescriptionRegex(), e.getMessage());
+        throw new IllegalArgumentException("Invalid regex pattern: " + 
issue.getDescriptionRegex(), e);
+      }
+    }
+
+    boolean matches(String summary) {
+      if (summary == null || summary.isEmpty()) {
+        return false;
+      }
+      return pattern.matcher(summary).find();
+    }
+
+    String getCategoryName() {
+      return issue.getCategoryName();
+    }
+  }
+}
+
+
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryIssueRepository.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryIssueRepository.java
index e9fdebf6a0..1ae6f6f291 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryIssueRepository.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryIssueRepository.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import javax.inject.Inject;
 import javax.inject.Singleton;
@@ -64,6 +65,14 @@ public class InMemoryIssueRepository implements 
IssueRepository {
     return new ArrayList<>(issues.values());
   }
 
+  @Override
+  public synchronized List<Issue> getMostRecentErrors(int limit)
+      throws TroubleshooterException {
+
+    return issues.values().stream().filter(issue -> issue.getSeverity() == 
IssueSeverity.ERROR)
+        .sorted((a, b) -> 
b.getTime().compareTo(a.getTime())).limit(limit).collect(Collectors.toList());
+  }
+
   @Override
   public synchronized void put(Issue issue)
       throws TroubleshooterException {
@@ -72,7 +81,7 @@ public class InMemoryIssueRepository implements 
IssueRepository {
       if (!reportedOverflow) {
         reportedOverflow = true;
         log.warn("In-memory issue repository has {} elements and is now full. 
New issues will be ignored.",
-                 issues.size());
+            issues.size());
       }
       return;
     }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java
index 530798d873..138f98ed42 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java
@@ -70,11 +70,23 @@ public class InMemoryMultiContextIssueRepository extends 
AbstractIdleService imp
     return Collections.emptyList();
   }
 
+  @Override
+  public synchronized List<Issue> getMostRecentErrors(String contextId, int 
limit)
+      throws TroubleshooterException {
+
+    InMemoryIssueRepository issueRepository = 
contextIssues.getOrDefault(contextId, null);
+
+    if (issueRepository != null) {
+      return issueRepository.getMostRecentErrors(limit);
+    }
+    return Collections.emptyList();
+  }
+
   @Override
   public synchronized void put(String contextId, Issue issue)
       throws TroubleshooterException {
-    InMemoryIssueRepository issueRepository = contextIssues
-        .computeIfAbsent(contextId, s -> new 
InMemoryIssueRepository(configuration.getMaxIssuesPerContext()));
+    InMemoryIssueRepository issueRepository = 
contextIssues.computeIfAbsent(contextId,
+        s -> new 
InMemoryIssueRepository(configuration.getMaxIssuesPerContext()));
 
     issueRepository.put(issue);
   }
@@ -83,8 +95,8 @@ public class InMemoryMultiContextIssueRepository extends 
AbstractIdleService imp
   public synchronized void put(String contextId, List<Issue> issues)
       throws TroubleshooterException {
 
-    InMemoryIssueRepository issueRepository = contextIssues
-        .computeIfAbsent(contextId, s -> new 
InMemoryIssueRepository(configuration.getMaxIssuesPerContext()));
+    InMemoryIssueRepository issueRepository = 
contextIssues.computeIfAbsent(contextId,
+        s -> new 
InMemoryIssueRepository(configuration.getMaxIssuesPerContext()));
 
     issueRepository.put(issues);
   }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/IssueRepository.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/IssueRepository.java
index 61f19eeba8..5741b9d662 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/IssueRepository.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/IssueRepository.java
@@ -35,6 +35,9 @@ public interface IssueRepository {
   List<Issue> getAll()
       throws TroubleshooterException;
 
+  List<Issue> getMostRecentErrors(int limit)
+      throws TroubleshooterException;
+
   /**
    * Saves an issue to the repository, if it is not yet present.
    *
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/IssueTestDataProvider.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/IssueTestDataProvider.java
new file mode 100644
index 0000000000..8894619d5b
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/IssueTestDataProvider.java
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ErrorCategory;
+import org.apache.gobblin.configuration.ErrorPatternProfile;
+
+import static java.util.Collections.singletonMap;
+
+@Slf4j
+public class IssueTestDataProvider {
+
+  // Test categories
+  public static final List<ErrorCategory> TEST_CATEGORIES = Arrays.asList(
+      new ErrorCategory("USER", 1),
+      new ErrorCategory("SYSTEM INFRA", 2),
+      new ErrorCategory("GAAS", 3),
+      new ErrorCategory("MISC", 4),
+      new ErrorCategory("UNKNOWN", 5),
+      new ErrorCategory("NON FATAL", 6)
+  );
+
+  public static final ErrorCategory TEST_DEFAULT_ERROR_CATEGORY = new 
ErrorCategory("UNKNOWN", 5);
+
+  // Test patterns - you'll need to add all the patterns here
+  public static final List<ErrorPatternProfile> TEST_PATTERNS = Arrays.asList(
+        new ErrorPatternProfile(".*the namespace quota.*", "USER"),
+        new ErrorPatternProfile(".*permission denied.*", "USER"),
+        new ErrorPatternProfile(".*remoteexception.*read only mount point.*", 
"USER"),
+        new ErrorPatternProfile(".*remoteexception: operation category write 
is not supported in state observer.*", "USER"),
+        new ErrorPatternProfile(".*invalidoperationexception: alter is not 
possible.*", "USER"),
+        new ErrorPatternProfile(".*the diskspace quota.*", "USER"),
+        new ErrorPatternProfile(".*filenotfoundexception:.*", "USER"),
+        new ErrorPatternProfile(".*does not exist.*", "USER"),
+        new ErrorPatternProfile("remoteexception: file/directory.*does not 
exist", "USER"),
+        new ErrorPatternProfile("runtimeexception: directory not found.*", 
"USER"),
+        new ErrorPatternProfile(".*icebergtable.tablenotfoundexception.*", 
"USER"),
+        new ErrorPatternProfile("ioexception: topath.*must be an ancestor of 
frompath.*", "USER"),
+        new ErrorPatternProfile("ioexception: origin path.*does not exist.*", 
"USER"),
+        new ErrorPatternProfile("illegalargumentexception: missing source 
iceberg table.*", "USER"),
+        new ErrorPatternProfile("remoteexception:.*already exists \\| failed 
to send re-scaling directive.*", "USER"),
+        new ErrorPatternProfile("illegalargumentexception: when replacing 
prefix, all locations must be descendants of the prefix.*", "GAAS"),
+        new ErrorPatternProfile("illegalargumentexception:.*", "MISC"),
+        new ErrorPatternProfile(".*remoteexception: server too busy.*", 
"SYSTEM INFRA"),
+        new ErrorPatternProfile("jschexception: session is down.*", "SYSTEM 
INFRA"),
+        new ErrorPatternProfile("jschexception: channel request.*", "SYSTEM 
INFRA"),
+        new ErrorPatternProfile(".*safemodeexception.*", "SYSTEM INFRA"),
+        new ErrorPatternProfile("remoteexception: no namenode available to 
invoke.*", "SYSTEM INFRA"),
+        new ErrorPatternProfile("remoteexception: 
org.apache.hadoop.hdfs.server.federation.router.nonamenodesavailableexception: 
no namenodes available under nameservice.*", "SYSTEM INFRA"),
+        new ErrorPatternProfile(".*no namenodes available under 
nameservice.*", "SYSTEM INFRA"),
+        new ErrorPatternProfile(".*sockettimeoutexception.*", "SYSTEM INFRA"),
+        new ErrorPatternProfile(".*socketexception: connection reset.*", 
"SYSTEM INFRA"),
+        new ErrorPatternProfile("timeoutexception: failed to update metadata 
after 60000 ms.*", "SYSTEM INFRA"),
+        new ErrorPatternProfile("networkexception: the server disconnected 
before a response was received.*", "SYSTEM INFRA"),
+        new ErrorPatternProfile("webclientrequestwithmessageexception: 
handshake timed out after 10000ms.*", "SYSTEM INFRA"),
+        new ErrorPatternProfile("timeoutfailure: message='activity heartbeat 
timeout.*", "GAAS"),
+        new ErrorPatternProfile("timeoutexception: failed to allocate memory 
within the configured max blocking time.*", "GAAS"),
+        new ErrorPatternProfile("timeoutfailure: message='activity 
starttoclose timeout'.*", "GAAS"),
+        new ErrorPatternProfile("timeoutexception: topic 
gobblintrackingevent_sgs not present in metadata after 60000 ms.*", "GAAS"),
+        new ErrorPatternProfile("connectexception: connection refused.*", 
"SYSTEM INFRA"),
+        new ErrorPatternProfile("connectionisclosedexception: no operations 
allowed after connection closed.*", "SYSTEM INFRA"),
+        new ErrorPatternProfile("ioexception: failed to get connection for.*is 
already stopped.*", "SYSTEM INFRA"),
+        new ErrorPatternProfile("ioexception: unable to close file because 
dfsclient was unable to contact the hdfs servers.*", "SYSTEM INFRA"),
+        new ErrorPatternProfile(".*hikaripool-.*-datasourceprovider - failed 
to execute connection test query.*", "SYSTEM INFRA"),
+        new ErrorPatternProfile(".*interruptedioexception.*", "MISC"),
+        new ErrorPatternProfile("interruptedexception: sleep interrupted.*", 
"MISC"),
+        new ErrorPatternProfile("closedbyinterruptexception.*", "MISC"),
+        new ErrorPatternProfile("interruptedexception:  \\| failed to commit 
writer for partition.*", "MISC"),
+        new ErrorPatternProfile(".*http connection failed for getting token 
from azuread.*", "USER"),
+        new ErrorPatternProfile("ioexception: invalid token.*", "USER"),
+        new ErrorPatternProfile(".*remoteexception: token.*can't be found in 
cache.*", "SYSTEM INFRA"),
+        new ErrorPatternProfile(".*applicationfailure:.*token.*can't be found 
in cache.*", "SYSTEM INFRA"),
+        new ErrorPatternProfile(".*closedchannelexception.*", "MISC"),
+        new ErrorPatternProfile("applicationfailure: message='task failed: 
org.apache.gobblin.runtime.forkexception.*", "MISC"),
+        new ErrorPatternProfile(".*eofexception.*", "MISC"),
+        new ErrorPatternProfile("canceledfailure: child canceled.*", "MISC"),
+        new ErrorPatternProfile(".*message='unable to create new native 
thread'.*", "GAAS"),
+        new ErrorPatternProfile("ioexception: failed to register hive spec 
simplehivespec.*", "USER"),
+        new ErrorPatternProfile("ioexception: error: likely concurrent writing 
to destination: \\(just prior\\) tablemetadata.*", "MISC"),
+        new ErrorPatternProfile(".*nullpointerexception:.*", "MISC"),
+        new ErrorPatternProfile(".*classnotfoundexception.*", "MISC"),
+        new ErrorPatternProfile(".*noclassdeffounderror: could not initialize 
class.*", "MISC"),
+        new ErrorPatternProfile(".*failure in getting work units for job.*", 
"MISC"),
+        new ErrorPatternProfile(".*operation failed: \"the uploaded data is 
not contiguous or the position query parameter value is not equal to the length 
of the file after appending the uploaded data.*", "MISC"),
+        new ErrorPatternProfile(".*applicationfailure: message='task failed: 
operation failed: \"the uploaded data is not contiguous or the position query 
parameter value is not equal to the length of the file after appending the 
uploaded data.*", "MISC"),
+        new ErrorPatternProfile(".*could not create work unit to deregister 
partition.*", "MISC"),
+        new ErrorPatternProfile("runtimeexception: not committing dataset.*", 
"MISC"),
+        new ErrorPatternProfile("job.*has unfinished commit sequences. will 
not clean up staging data", "MISC"),
+        new ErrorPatternProfile(".*failed to commit.*dataset.*", "MISC"),
+        new ErrorPatternProfile(".*could not commit file.*", "MISC"),
+        new ErrorPatternProfile("runtimeexception: failed to parse the 
date.*", "USER"),
+        new ErrorPatternProfile("runtimeexception: clean failed for one or 
more datasets.*", "MISC"),
+        new ErrorPatternProfile("runtimeexception: not copying.*", "MISC"),
+        new ErrorPatternProfile(".*failed to read from all available 
datanodes.*", "NON FATAL"),
+        new ErrorPatternProfile(".*failed to prepare extractor: error - failed 
to get schema for this object.*", "NON FATAL"),
+        new ErrorPatternProfile(".*failed to get primary group for kafkaetl, 
using user name as primary group name.*", "NON FATAL"),
+        new ErrorPatternProfile(".*failed to delete.*", "NON FATAL"),
+        new ErrorPatternProfile(".*applicationfailure: message='task failed: 
java.lang.arrayindexoutofboundsexception.*", "NON FATAL"),
+        new ErrorPatternProfile(".*arrayindexoutofboundsexception.*failed to 
close all open resources", "NON FATAL"),
+        new ErrorPatternProfile("ioexception: failed to clean up all writers. 
\\| failed to close all open resources.*", "MISC"),
+        new ErrorPatternProfile("applicationfailure:.*failed to clean up all 
writers.*", "MISC"),
+        new ErrorPatternProfile("applicationfailure: message='task failed: 
java.io.ioexception: failed to commit all writers..*", "MISC"),
+        new ErrorPatternProfile(".*metaexception.*", "USER"),
+        new ErrorPatternProfile(".*ioexception: filesystem closed.*", "GAAS"),
+        new ErrorPatternProfile(".*source and target table are not 
compatible.*", "USER"),
+        new ErrorPatternProfile("hivetablelocationnotmatchexception: desired 
target location.*do not agree.*", "USER"),
+        new ErrorPatternProfile(".*schema mismatch between metadata.*", 
"USER"),
+        new ErrorPatternProfile(".*could not read all task state files.*", 
"MISC"),
+        new ErrorPatternProfile("taskstatestore successfully opened, but no 
task states found under.*", "MISC"),
+        new ErrorPatternProfile(".*setting task state to failed.*", "MISC"),
+        new ErrorPatternProfile("illegalstateexception: expected end_array.*", 
"GAAS"),
+        new ErrorPatternProfile("illegalstateexception: cannot perform 
operation after producer has been closed.*", "GAAS"),
+        new ErrorPatternProfile("webclientresponse.*503 service 
unavailable.*", "SYSTEM INFRA"),
+        new ErrorPatternProfile("webclientresponseexception.*504.*", "SYSTEM 
INFRA"),
+        new 
ErrorPatternProfile("webclientresponsewithmessageexception.*504.*", "SYSTEM 
INFRA"),
+        new ErrorPatternProfile("webclientresponseexception.*400.*", "USER"),
+        new ErrorPatternProfile("webclientresponseexception.*409.*", "USER"),
+        new 
ErrorPatternProfile("webclientresponsewithmessageexception.*402.*", "USER"),
+        new ErrorPatternProfile(".*fsstatestore.*", "MISC"),
+        new ErrorPatternProfile("sqltransientconnectionexception:.*connection 
is not available, request timed out after.*", "SYSTEM INFRA"),
+        new ErrorPatternProfile("sqlnontransientconnectionexception: got 
timeout reading communication packets.*", "SYSTEM INFRA"),
+        new ErrorPatternProfile("sqlexception: unsupported transaction 
isolation level '-1'.*", "SYSTEM INFRA"),
+        new ErrorPatternProfile("sqlexception: incorrect string 
value:.*for.*", "USER"),
+        new ErrorPatternProfile(".*obstateutils.opentaskstatestoreuncached 
debug.*", "MISC"),
+        new ErrorPatternProfile("[bug] workflow thread.*can't be destroyed in 
time. this will lead to a workflow cache leak. this problem is usually caused 
by a workflow implementation swallowing java.lang.error instead of rethrowing 
it.*", "SYSTEM INFRA"),
+        new ErrorPatternProfile(".*outofmemoryerror.*", "GAAS"),
+        new ErrorPatternProfile("filealreadyexistsexception: rename 
destination.*", "NON FATAL"),
+        new ErrorPatternProfile("applicationfailure: message='failed to 
rename.*", "MISC"),
+        new ErrorPatternProfile("potentialdeadlockexception: potential 
deadlock detected.*", "GAAS"),
+        new ErrorPatternProfile("workflow lock for the run id hasn't been 
released by one of previous execution attempts, consider increasing workflow 
task timeout.*", "GAAS"),
+        new ErrorPatternProfile("nosuchelementexception:  \\| failed to commit 
writer for partition.*", "MISC"),
+        new ErrorPatternProfile("illegalargumentexception: cannot instantiate 
jdbcpublisher since it does not extend singletaskdatapublisher.*", "MISC"),
+        new ErrorPatternProfile("processing 
failure:.*retrystate=retry_state.*commit workflow failure", "MISC"),
+        new ErrorPatternProfile(".*recordtoolargeexception.*", "MISC"),
+        new ErrorPatternProfile("ioexception: could not get block locations. 
source file", "MISC"),
+        new ErrorPatternProfile("ioexception: job status not available  \\| 
failed to launch and run job.*", "MISC"),
+        new ErrorPatternProfile("enqueuing of event.*timed out. sending of 
events is probably stuck", "MISC"),
+        new ErrorPatternProfile("applicationfailure: message='task failed: 
status code: -1 error code.*", "MISC"),
+        new ErrorPatternProfile("applicationfailure: message='failing in 
submitting at least one task before execution.'.*", "MISC"),
+        new ErrorPatternProfile(".*watermark type simple not recognized.*", 
"MISC")
+  );
+
+  public static List<ErrorPatternProfile> getSortedPatterns() {
+    Map<String, Integer> categoryPriority = new HashMap<>();
+    for (ErrorCategory errorCategory : TEST_CATEGORIES) {
+      categoryPriority.put(errorCategory.getCategoryName(), 
errorCategory.getPriority());
+    }
+
+    List<ErrorPatternProfile> sortedPatterns = new ArrayList<>(TEST_PATTERNS);
+    sortedPatterns.sort(Comparator.comparingInt(e -> 
categoryPriority.getOrDefault(e.getCategoryName(), Integer.MAX_VALUE)));
+    return sortedPatterns;
+  }
+
+  public static List<Issue> testUserCategoryIssues() {
+    log.info("Generating test data for USER category issues");
+    List<Issue> issues = new ArrayList<>();
+
+    // Namespace Quota issues
+    issues.add(new Issue(ZonedDateTime.now(), IssueSeverity.ERROR, "ABCD123",
+        "Job failed due to the namespace quota being exceeded for user data", 
"ContextA", "namespaceQuotaError",
+        "quotaexception", singletonMap("keyA", "valueA")));
+
+    // Permission Issues
+    issues.add(new Issue(ZonedDateTime.now().minusDays(1), 
IssueSeverity.ERROR, "ABCD123",
+        "Task failed with permission denied when accessing secure directory", 
"ContextB", "permissionError",
+        "securityexception", singletonMap("keyB", "valueB")));
+    issues.add(new Issue(ZonedDateTime.now().minusHours(2), 
IssueSeverity.ERROR, "ABCD123",
+        "RemoteException: operation category write is not supported in state 
observer mode", "ContextC",
+        "writePermissionError", "remoteexception", singletonMap("keyC", 
"valueC")));
+
+    // Diskspace Quota Issues
+    issues.add(new Issue(ZonedDateTime.now().minusHours(3), 
IssueSeverity.ERROR, "ABCD123",
+        "Operation failed because the diskspace quota has been reached", 
"ContextD", "diskQuotaError", "quotaexception",
+        singletonMap("keyD", "valueD")));
+
+    // File Not Found Issues
+    issues.add(new Issue(ZonedDateTime.now().minusHours(4), 
IssueSeverity.ERROR, "ABCD123",
+        "FileNotFoundException: /path/to/missing/file.txt not found", 
"ContextE", "fileNotFoundError",
+        "filenotfoundexception", singletonMap("keyE", "valueE")));
+    issues.add(new Issue(ZonedDateTime.now().minusHours(5), 
IssueSeverity.ERROR, "ABCD123",
+        "RemoteException: file/directory /data/temp does not exist", 
"ContextF", "directoryNotFoundError",
+        "remoteexception", singletonMap("keyF", "valueF")));
+
+    // Token Issues
+    issues.add(new Issue(ZonedDateTime.now().minusHours(6), 
IssueSeverity.ERROR, "ABCD123",
+        "HTTP connection failed for getting token from AzureAD authentication 
service", "ContextG", "tokenError",
+        "authexception", singletonMap("keyG", "valueG")));
+
+    return issues;
+  }
+
+  public static List<Issue> testSystemInfraCategoryIssues() {
+    // Returns a list of Issues that should match SYSTEM INFRA category regex 
patterns
+    log.info("Generating test data for SYSTEM_INFRA category issues");
+    List<Issue> issues = new ArrayList<>();
+
+    // Server Issues
+    issues.add(new Issue(ZonedDateTime.now(), IssueSeverity.ERROR, 
"SYSTEM_INFRA",
+        "RemoteException: server too busy to handle request at this time", 
"ContextA", "serverBusyError",
+        "remoteexception", singletonMap("keyA", "valueA")));
+    issues.add(new Issue(ZonedDateTime.now().minusDays(1), 
IssueSeverity.ERROR, "SYSTEM_INFRA",
+        "JSchException: session is down and cannot be restored", "ContextB", 
"sessionDownError", "jschexception",
+        singletonMap("keyB", "valueB")));
+
+    // Namenode Issues
+    issues.add(new Issue(ZonedDateTime.now().minusHours(2), 
IssueSeverity.ERROR, "SYSTEM_INFRA",
+        "SafeModeException: Name node is in safe mode and cannot accept 
changes", "ContextC", "safeModeError",
+        "safemodeexception", singletonMap("keyC", "valueC")));
+    issues.add(new Issue(ZonedDateTime.now().minusHours(3), 
IssueSeverity.ERROR, "SYSTEM_INFRA",
+        "RemoteException: no namenode available to invoke for cluster 
operations", "ContextD", "namenodeError",
+        "remoteexception", singletonMap("keyD", "valueD")));
+
+    // Timeout Issues
+    issues.add(new Issue(ZonedDateTime.now().minusHours(4), 
IssueSeverity.ERROR, "SYSTEM_INFRA",
+        "SocketTimeoutException: Read timeout after 30000ms waiting for 
response", "ContextE", "timeoutError",
+        "sockettimeoutexception", singletonMap("keyE", "valueE")));
+    issues.add(new Issue(ZonedDateTime.now().minusHours(5), 
IssueSeverity.ERROR, "SYSTEM_INFRA",
+        "SocketException: connection reset by peer during data transfer", 
"ContextF", "connectionResetError",
+        "socketexception", singletonMap("keyF", "valueF")));
+
+    // Connection Issues
+    issues.add(new Issue(ZonedDateTime.now().minusHours(6), 
IssueSeverity.ERROR, "SYSTEM_INFRA",
+        "ConnectException: connection refused when trying to connect to remote 
service", "ContextG",
+        "connectionRefusedError", "connectexception", singletonMap("keyG", 
"valueG")));
+
+    return issues;
+  }
+
+  public static List<Issue> testGaasCategoryIssues() {
+    // Returns a list of Issues that should match GAAS category regex patterns
+    log.info("Generating test data for GAAS category issues");
+    List<Issue> issues = new ArrayList<>();
+
+    String testString = "TimeoutFailure: message='activity heartbeat timeout 
exceeded configured limit'"; // TBD: DELETE
+    // Timeout Issues
+    issues.add(
+        new Issue(ZonedDateTime.now(), IssueSeverity.ERROR, "GAAS", 
testString, "ContextA", "activityTimeoutError",
+            "timeoutfailure", singletonMap("keyA", "valueA")));
+
+    issues.add(new Issue(ZonedDateTime.now().minusDays(1), 
IssueSeverity.ERROR, "GAAS",
+        "TimeoutException: failed to allocate memory within the configured max 
blocking time of 5000ms", "ContextB",
+        "memoryTimeoutError", "timeoutexception", singletonMap("keyB", 
"valueB")));
+
+    // Thread Issues
+    issues.add(new Issue(ZonedDateTime.now().minusHours(2), 
IssueSeverity.ERROR, "GAAS",
+        "ApplicationFailure: message='unable to create new native thread - 
system limits reached'", "ContextC",
+        "threadCreationError", "applicationfailure", singletonMap("keyC", 
"valueC")));
+
+    // Filesystem Issues
+    issues.add(new Issue(ZonedDateTime.now().minusHours(3), 
IssueSeverity.ERROR, "GAAS",
+        "IOException: filesystem closed unexpectedly during operation", 
"ContextD", "filesystemClosedError",
+        "ioexception", singletonMap("keyD", "valueD")));
+
+    // Illegal State Issues
+    issues.add(new Issue(ZonedDateTime.now().minusHours(4), 
IssueSeverity.ERROR, "GAAS",
+        "IllegalStateException: expected END_ARRAY but found different token 
type", "ContextE", "illegalStateError",
+        "illegalstateexception", singletonMap("keyE", "valueE")));
+
+    return issues;
+  }
+
+  public static List<Issue> testMiscCategoryIssues() {
+    // Returns a list of Issues that should match MISC category regex patterns
+    log.info("Generating test data for MISC category issues");
+    List<Issue> issues = new ArrayList<>();
+
+    // Interruption Issues
+    issues.add(new Issue(ZonedDateTime.now(), IssueSeverity.WARN, "MISC",
+        "InterruptedIOException occurred during file operation", "ContextA", 
"interruptedIOError",
+        "interruptedioexception", singletonMap("keyA", "valueA")));
+    issues.add(new Issue(ZonedDateTime.now().minusDays(1), IssueSeverity.WARN, 
"MISC",
+        "InterruptedException: sleep interrupted by external signal", 
"ContextB", "sleepInterruptedError",
+        "interruptedexception", singletonMap("keyB", "valueB")));
+
+    // Null Pointer Issues
+    issues.add(new Issue(ZonedDateTime.now().minusHours(2), 
IssueSeverity.ERROR, "MISC",
+        "NullPointerException: attempt to invoke method on null object 
reference", "ContextC", "nullPointerError",
+        "nullpointerexception", singletonMap("keyC", "valueC")));
+
+    // Class Not Found Issues
+    issues.add(new Issue(ZonedDateTime.now().minusHours(3), 
IssueSeverity.ERROR, "MISC",
+        "ClassNotFoundException: com.example.MissingClass not found in 
classpath", "ContextD", "classNotFoundError",
+        "classnotfoundexception", singletonMap("keyD", "valueD")));
+
+    // Concurrent Write Issues
+    issues.add(new Issue(ZonedDateTime.now().minusHours(4), 
IssueSeverity.ERROR, "MISC",
+        "IOException: error: likely concurrent writing to destination: (just 
prior) TableMetadata collision detected",
+        "ContextE", "concurrentWriteError", "ioexception", 
singletonMap("keyE", "valueE")));
+
+    return issues;
+  }
+
+  public static List<Issue> testNonFatalCategoryIssues() {
+    log.info("Generating test data for NON FATAL category issues");
+    // Returns a list of Issues that should match NON FATAL category regex 
patterns
+    List<Issue> issues = new ArrayList<>();
+
+
+    issues.add(new Issue(ZonedDateTime.now(), IssueSeverity.ERROR, "NON FATAL",
+        "filealreadyexistsexception: rename destination ", "ContextA", 
"copyDataWarning",
+        "copydatapublisher", singletonMap("keyA", "valueA")));
+
+    issues.add(new Issue(ZonedDateTime.now().minusDays(1), 
IssueSeverity.ERROR, "NON FATAL",
+        "Warning: failed to read from all available datanodes, retrying with 
backup nodes", "ContextB",
+        "datanodeReadWarning", "ioexception", singletonMap("keyB", "valueB")));
+
+    issues.add(new Issue(ZonedDateTime.now().minusHours(2), 
IssueSeverity.ERROR, "NON FATAL",
+        "Info: failed to delete temporary file - cleanup will retry later", 
"ContextC", "deleteFailureInfo",
+        "ioexception", singletonMap("keyC", "valueC")));
+
+    issues.add(new Issue(ZonedDateTime.now().minusHours(3), 
IssueSeverity.ERROR, "NON FATAL",
+        "1234: arrayindexoutofboundsexception. Operation failed to close all 
open resources", "ContextD",
+        "reporterException", "reporterexception", singletonMap("keyD", 
"valueD")));
+
+    return issues;
+  }
+
+  public static List<Issue> testNegativeMatchCases() {
+    log.info("Generating test data for negative match cases");
+    // Returns a list of Issues that should NOT match any specific regex 
patterns
+    List<Issue> issues = new ArrayList<>();
+
+    // Generic success messages
+    issues.add(
+        new Issue(ZonedDateTime.now(), IssueSeverity.INFO, "SUCCESS", "Job 
completed successfully without any errors",
+            "ContextA", "successInfo", "success", singletonMap("keyA", 
"valueA")));
+
+    // Custom application errors not in regex patterns
+    issues.add(new Issue(ZonedDateTime.now().minusDays(1), 
IssueSeverity.ERROR, "CUSTOM",
+        "CustomBusinessLogicException: validation failed for user input", 
"ContextB", "customError",
+        "businessexception", singletonMap("keyB", "valueB")));
+
+    // Generic warnings that don't match patterns
+    issues.add(new Issue(ZonedDateTime.now().minusHours(2), 
IssueSeverity.WARN, "GENERIC",
+        "Process completed with warnings but no critical errors detected", 
"ContextC", "genericWarning", "warning",
+        singletonMap("keyC", "valueC")));
+
+    // Unrelated error messages
+    issues.add(new Issue(ZonedDateTime.now().minusHours(3), 
IssueSeverity.ERROR, "UNRELATED",
+        "Database connection pool exhausted - increase pool size 
configuration", "ContextD", "poolError",
+        "poolexception", singletonMap("keyD", "valueD")));
+
+    return issues;
+  }
+
+  public static List<Issue> testEdgeCasesAndBoundaryConditions() {
+    log.info("Generating test data for edge cases and boundary conditions");
+    // Returns a list of Issues to test edge cases and boundary conditions
+    List<Issue> issues = new ArrayList<>();
+
+    // Case sensitivity tests
+    issues.add(new Issue(ZonedDateTime.now(), IssueSeverity.ERROR, "CASE_TEST",
+        "FILENOTFOUNDEXCEPTION: uppercase version of file not found error", 
"ContextA", "uppercaseError",
+        "filenotfoundexception", singletonMap("keyA", "valueA")));
+
+    // Partial pattern matches
+    issues.add(new Issue(ZonedDateTime.now().minusDays(1), 
IssueSeverity.ERROR, "PARTIAL_TEST",
+        "This is a permission error but not exactly permission denied", 
"ContextB", "partialPermissionError",
+        "securityexception", singletonMap("keyB", "valueB")));
+
+    // Multiple pattern matches in single description
+    issues.add(new Issue(ZonedDateTime.now().minusHours(2), 
IssueSeverity.ERROR, "MULTI_MATCH",
+        "FileNotFoundException: permission denied when accessing the namespace 
quota restricted file", "ContextC",
+        "multiMatchError", "complexexception", singletonMap("keyC", 
"valueC")));
+
+    // Empty and null-like descriptions
+    issues.add(new Issue(ZonedDateTime.now().minusHours(3), 
IssueSeverity.ERROR, "EMPTY_TEST", "", "ContextD",
+        "emptyDescriptionError", "unknownexception", singletonMap("keyD", 
"valueD")));
+    issues.add(new Issue(ZonedDateTime.now().minusHours(4), 
IssueSeverity.ERROR, "NULL_TEST", null, "ContextE",
+        "nullDescriptionError", "unknownexception", singletonMap("keyE", 
"valueE")));
+
+    // Very long descriptions
+    issues.add(new Issue(ZonedDateTime.now().minusHours(5), 
IssueSeverity.ERROR, "LONG_TEST",
+        "This is a very long error description that contains the phrase 
'permission denied' somewhere in the middle of a lot of other text that might 
make pattern matching more challenging to ensure our regex patterns work 
correctly even with verbose error messages",
+        "ContextF", "longDescriptionError", "verboseexception", 
singletonMap("keyF", "valueF")));
+
+    return issues;
+  }
+
+  public static List<Issue> testNonFatalAndUnknownMix() {
+    // Returns a list of Issues that should match NON FATAL and UNKNOWN 
categories
+    List<Issue> issues = new ArrayList<>();
+
+    // NON FATAL issues
+    issues.add(new Issue(ZonedDateTime.now(), IssueSeverity.ERROR, "NON FATAL",
+        "applicationfailure: message='task failed: 
java.lang.arrayindexoutofboundsexception"
+            + "ues", "ContextA", "copyDataWarning",
+        "copydatapublisher", singletonMap("keyA", "valueA")));
+    issues.add(new Issue(ZonedDateTime.now().minusHours(1), 
IssueSeverity.ERROR, "NON FATAL",
+        "Info: failed to delete temporary file - cleanup will retry later", 
"ContextB", "deleteFailureInfo",
+        "ioexception", singletonMap("keyB", "valueB")));
+
+    // UNKNOWN issues (should not match any known category)
+    issues.add(new Issue(ZonedDateTime.now().minusHours(2), 
IssueSeverity.ERROR, "UNKNOWN",
+        "CustomApplicationException: unexpected error occurred in module", 
"ContextC", "customError", "customexception",
+        singletonMap("keyC", "valueC")));
+    issues.add(new Issue(ZonedDateTime.now().minusHours(3), 
IssueSeverity.WARN, "UNKNOWN",
+        "Process completed with warnings but no critical errors detected", 
"ContextD", "genericWarning", "warning",
+        singletonMap("keyD", "valueD")));
+
+    return issues;
+  }
+
+  public static List<Issue> testMaximumErrorCountExceeded() {
+    // Returns a list of Issues where the number of errors exceeds the maximum 
allowed count
+    // This should trigger overflow/limit logic to handle excessive error 
volumes
+    List<Issue> issues = new ArrayList<>();
+
+    // Add 15 USER errors (highest priority)
+    for (int i = 0; i < 15; i++) {
+      issues.add(new Issue(ZonedDateTime.now().minusMinutes(i), 
IssueSeverity.ERROR, "USER",
+          "FileNotFoundException: /path/to/missing/file" + i + ".txt not 
found", "ContextU" + i,
+          "fileNotFoundError" + i, "filenotfoundexception", 
singletonMap("keyU" + i, "valueU" + i)));
+    }
+
+    // Add 15 SYSTEM INFRA errors
+    for (int i = 0; i < 15; i++) {
+      issues.add(new Issue(ZonedDateTime.now().minusMinutes(15 + i), 
IssueSeverity.ERROR, "SYSTEM_INFRA",
+          "RemoteException: server too busy to handle request " + i, 
"ContextS" + i, "serverBusyError" + i,
+          "remoteexception", singletonMap("keyS" + i, "valueS" + i)));
+    }
+
+    // Add 15 GAAS errors
+    for (int i = 0; i < 15; i++) {
+      issues.add(new Issue(ZonedDateTime.now().minusMinutes(30 + i), 
IssueSeverity.ERROR, "GAAS",
+          "TimeoutFailure: message='activity heartbeat timeout " + i + "'", 
"ContextG" + i, "timeoutError" + i,
+          "timeoutfailure", singletonMap("keyG" + i, "valueG" + i)));
+    }
+
+    // Add 15 MISC errors
+    for (int i = 0; i < 15; i++) {
+      issues.add(new Issue(ZonedDateTime.now().minusMinutes(45 + i), 
IssueSeverity.ERROR, "MISC",
+          "NullPointerException: null reference in operation " + i, "ContextM" 
+ i, "nullPointerError" + i,
+          "nullpointerexception", singletonMap("keyM" + i, "valueM" + i)));
+    }
+
+    // Add 10 NON FATAL errors (lower priority)
+    for (int i = 0; i < 10; i++) {
+      issues.add(new Issue(ZonedDateTime.now().minusMinutes(60 + i), 
IssueSeverity.WARN, "NON FATAL",
+          "Warning: failed to delete temporary file " + i + " - cleanup will 
retry later", "ContextN" + i,
+          "deleteFailureWarning" + i, "ioexception", singletonMap("keyN" + i, 
"valueN" + i)));
+    }
+
+    return issues;
+  }
+
+  public static List<Issue> testMinimumErrorCountBelowThreshold() {
+    // Returns a list of Issues where the number of errors is less than the 
maximum threshold required for a category
+    // This should not trigger category match or should be handled gracefully
+    List<Issue> issues = new ArrayList<>();
+
+    // Single error from each category - assuming maximum threshold is > 1
+    // These should either not be classified or handled with special logic
+
+    // Single USER error
+    issues.add(new Issue(ZonedDateTime.now(), IssueSeverity.ERROR, "USER",
+        "FileNotFoundException: /single/missing/file.txt not found", 
"ContextU1", "singleFileNotFoundError",
+        "filenotfoundexception", singletonMap("keyU1", "valueU1")));
+
+    // Single SYSTEM INFRA error
+    issues.add(new Issue(ZonedDateTime.now().minusMinutes(5), 
IssueSeverity.ERROR, "SYSTEM_INFRA",
+        "RemoteException: server too busy - single occurrence", "ContextS1", 
"singleServerBusyError", "remoteexception",
+        singletonMap("keyS1", "valueS1")));
+
+    // Single GAAS error
+    issues.add(new Issue(ZonedDateTime.now().minusMinutes(10), 
IssueSeverity.ERROR, "GAAS",
+        "TimeoutFailure: message='single activity heartbeat timeout'", 
"ContextG1", "singleTimeoutError",
+        "timeoutfailure", singletonMap("keyG1", "valueG1")));
+
+    // Single MISC error
+    issues.add(new Issue(ZonedDateTime.now().minusMinutes(15), 
IssueSeverity.ERROR, "MISC",
+        "NullPointerException: single null reference occurrence", "ContextM1", 
"singleNullPointerError",
+        "nullpointerexception", singletonMap("keyM1", "valueM1")));
+
+    // Single NON FATAL error
+    issues.add(new Issue(ZonedDateTime.now().minusMinutes(20), 
IssueSeverity.ERROR, "NON FATAL",
+        "Warning: single failed delete operation - cleanup will retry", 
"ContextN1", "singleDeleteFailureWarning",
+        "ioexception", singletonMap("keyN1", "valueN1")));
+
+    return issues;
+  }
+
+  public static List<Issue> 
testMixedCategoryIssuesFromLowestToHighestPriority() {
+    // Returns a list of Issues exceeding the display/processing limit, 
ordered from lowest to highest priority
+    List<Issue> issues = new ArrayList<>();
+
+    // Add 5 UNKNOWN (lowest priority) - errors that don't match any regex 
patterns
+    String[] unknownErrors =
+        {"CustomBusinessLogicException: validation failed for user input", 
"DatabaseConnectionException: connection pool exhausted", 
"ConfigurationException: invalid property value detected", "SecurityException: 
authentication failed with external service", "NetworkException: unable to 
reach external API endpoint"};
+    for (int i = 0; i < 5; i++) {
+      issues.add(new Issue(ZonedDateTime.now().minusMinutes(60 + i), 
IssueSeverity.INFO, "UNKNOWN", unknownErrors[i],
+          "ContextU" + i, "unknownError" + i, "unknownexception", 
singletonMap("keyU" + i, "valueU" + i)));
+    }
+
+    // Add 5 NON FATAL - errors that match NON FATAL regex patterns
+    String[] nonFatalErrors =
+        {"CopyDataPublisher warning: srcfs operation completed with minor 
issues", "Warning: failed to read from all available datanodes, retrying with 
backup nodes", "Info: failed to delete temporary file - cleanup will retry 
later", "Exception thrown from InGraphsReporter#report. Exception was 
suppressed and processing continued", "ApplicationFailure: message='task 
failed: java.lang.ArrayIndexOutOfBoundsException at index 5'"};
+    for (int i = 0; i < 5; i++) {
+      issues.add(new Issue(ZonedDateTime.now().minusMinutes(50 + i), 
IssueSeverity.WARN, "NON FATAL", nonFatalErrors[i],
+          "ContextN" + i, "nonFatalError" + i, "nonfatalexception", 
singletonMap("keyN" + i, "valueN" + i)));
+    }
+
+    // Add 5 MISC - errors that match MISC regex patterns
+    String[] miscErrors =
+        {"InterruptedIOException occurred during file operation", 
"NullPointerException: attempt to invoke method on null object reference", 
"ClassNotFoundException: com.example.MissingClass not found in classpath", 
"IOException: error: likely concurrent writing to destination: (just prior) 
TableMetadata collision detected", "ApplicationFailure: failed to clean up all 
writers during shutdown"};
+    for (int i = 0; i < 5; i++) {
+      issues.add(new Issue(ZonedDateTime.now().minusMinutes(40 + i), 
IssueSeverity.ERROR, "MISC", miscErrors[i],
+          "ContextM" + i, "miscError" + i, "miscexception", 
singletonMap("keyM" + i, "valueM" + i)));
+    }
+
+    // Add 5 GAAS - errors that match GAAS regex patterns
+    String[] gaasErrors =
+        {"TimeoutFailure: message='activity heartbeat timeout exceeded 
configured limit'", "TimeoutException: failed to allocate memory within the 
configured max blocking time of 5000ms", "ApplicationFailure: message='unable 
to create new native thread - system limits reached'", "IOException: filesystem 
closed unexpectedly during operation", "IllegalStateException: expected 
END_ARRAY but found different token type"};
+    for (int i = 0; i < 5; i++) {
+      issues.add(new Issue(ZonedDateTime.now().minusMinutes(30 + i), 
IssueSeverity.ERROR, "GAAS", gaasErrors[i],
+          "ContextG" + i, "gaasError" + i, "gaasexception", 
singletonMap("keyG" + i, "valueG" + i)));
+    }
+
+    // Add 5 SYSTEM INFRA - errors that match SYSTEM INFRA regex patterns
+    String[] systemInfraErrors =
+        {"RemoteException: server too busy to handle request at this time", 
"JSchException: session is down and cannot be restored", "SafeModeException: 
Name node is in safe mode and cannot accept changes", "SocketTimeoutException: 
Read timeout after 30000ms waiting for response", "ConnectException: connection 
refused when trying to connect to remote service"};
+    for (int i = 0; i < 5; i++) {
+      issues.add(
+          new Issue(ZonedDateTime.now().minusMinutes(20 + i), 
IssueSeverity.ERROR, "SYSTEM_INFRA", systemInfraErrors[i],
+              "ContextS" + i, "systemInfraError" + i, "systeminfraexception", 
singletonMap("keyS" + i, "valueS" + i)));
+    }
+
+    // Add 5 USER (highest priority) - errors that match USER regex patterns
+    String[] userErrors =
+        {"Job failed due to the namespace quota being exceeded for user data", 
"Task failed with permission denied when accessing secure directory", 
"Operation failed because the diskspace quota has been reached", 
"FileNotFoundException: /path/to/missing/file.txt not found", "HTTP connection 
failed for getting token from AzureAD authentication service"};
+    for (int i = 0; i < 5; i++) {
+      issues.add(new Issue(ZonedDateTime.now().minusMinutes(10 + i), 
IssueSeverity.ERROR, "USER", userErrors[i],
+          "ContextUR" + i, "userError" + i, "userexception", 
singletonMap("keyUR" + i, "valueUR" + i)));
+    }
+
+    return issues;
+  }
+}
+
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java
index a6e729200f..ea26ede30f 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.runtime.troubleshooter;
 
+import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
 
@@ -29,6 +30,8 @@ import com.typesafe.config.Config;
 import javax.inject.Inject;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.service.ServiceConfigKeys;
+
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.util.GsonUtils;
@@ -40,7 +43,7 @@ import org.apache.gobblin.util.ConfigUtils;
  *
  *  It will additionally log received issues, so that they can be processed by 
an analytical systems to determine
  *  the overall platform health.
- * */
+ **/
 @Slf4j
 public class JobIssueEventHandler {
 
@@ -53,14 +56,17 @@ public class JobIssueEventHandler {
   private final MultiContextIssueRepository issueRepository;
   private final boolean logReceivedEvents;
 
+  private final Config config;
+
   @Inject
   public JobIssueEventHandler(MultiContextIssueRepository issueRepository, 
Config config) {
-    this(issueRepository, ConfigUtils.getBoolean(config, LOG_RECEIVED_EVENTS, 
true));
+    this(issueRepository, ConfigUtils.getBoolean(config, LOG_RECEIVED_EVENTS, 
true), config);
   }
 
-  public JobIssueEventHandler(MultiContextIssueRepository issueRepository, 
boolean logReceivedEvents) {
+  public JobIssueEventHandler(MultiContextIssueRepository issueRepository, 
boolean logReceivedEvents, Config config) {
     this.issueRepository = Objects.requireNonNull(issueRepository);
     this.logReceivedEvents = logReceivedEvents;
+    this.config = config;
   }
 
   public void processEvent(GobblinTrackingEvent event) {
@@ -83,8 +89,9 @@ public class JobIssueEventHandler {
     try {
       issueRepository.put(contextId, issueEvent.getIssue());
     } catch (TroubleshooterException e) {
-      log.warn(String.format("Failed to save issue to repository. Issue time: 
%s, code: %s",
-                             issueEvent.getIssue().getTime(), 
issueEvent.getIssue().getCode()), e);
+      log.warn(
+          String.format("Failed to save issue to repository. Issue time: %s, 
code: %s", issueEvent.getIssue().getTime(),
+              issueEvent.getIssue().getCode()), e);
     }
 
     if (logReceivedEvents) {
@@ -114,4 +121,23 @@ public class JobIssueEventHandler {
     String jobName;
     Issue issue;
   }
-}
+
+  public List<Issue> getErrorListForClassification(String contextId)
+      throws TroubleshooterException {
+          int limit = ConfigUtils.getInt(config, 
ServiceConfigKeys.ERROR_CLASSIFICATION_MAX_ERRORS_TO_PROCESS_KEY,
+        ServiceConfigKeys.DEFAULT_ERROR_CLASSIFICATION_MAX_ERRORS_TO_PROCESS);
+      return issueRepository.getMostRecentErrors(contextId, limit);
+  }
+
+  public void logFinalError(Issue issue, String flowName, String flowGroup, 
String flowExecutionId, String jobName) {
+    JobIssueLogEntry logEntry = new JobIssueLogEntry();
+    logEntry.issue = issue;
+    logEntry.flowName = flowName;
+    logEntry.flowGroup = flowGroup;
+    logEntry.flowExecutionId = flowExecutionId;
+    logEntry.jobName = jobName;
+
+    String serializedIssueEvent = 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(logEntry);
+    issueLogger.info(serializedIssueEvent);
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
index 1d30ed3131..0b6f2279e4 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
@@ -37,6 +37,12 @@ public interface MultiContextIssueRepository extends Service 
{
   List<Issue> getAll(String contextId)
       throws TroubleshooterException;
 
+  /**
+   * Will return issues with severity as ERROR in the reverse order as they 
were put into the repository.
+   * */
+  List<Issue> getMostRecentErrors(String contextId, int limit)
+      throws TroubleshooterException;
+
   void put(String contextId, Issue issue)
       throws TroubleshooterException;
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/NoopIssueRepository.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/NoopIssueRepository.java
index f47f9085fb..b3716d66f0 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/NoopIssueRepository.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/NoopIssueRepository.java
@@ -29,6 +29,12 @@ public class NoopIssueRepository implements IssueRepository {
     return Collections.emptyList();
   }
 
+  @Override
+  public List<Issue> getMostRecentErrors(int limit)
+      throws TroubleshooterException {
+    return Collections.emptyList();
+  }
+
   @Override
   public void put(Issue issue)
       throws TroubleshooterException {
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/ErrorClassifierTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/ErrorClassifierTest.java
new file mode 100644
index 0000000000..9ba3b56615
--- /dev/null
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/ErrorClassifierTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import org.apache.gobblin.configuration.ErrorCategory;
+import org.apache.gobblin.configuration.ErrorPatternProfile;
+import org.apache.gobblin.metastore.InMemoryErrorPatternStore;
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+import org.apache.gobblin.runtime.troubleshooter.IssueTestDataProvider;
+import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
+
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.*;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import static org.testng.Assert.*;
+
+
+public class ErrorClassifierTest {
+  private ErrorClassifier classifier;
+  private List<ErrorCategory> categories;
+  private ErrorCategory _defaultErrorCategory;
+  private InMemoryErrorPatternStore store;
+
+  @BeforeMethod
+  public void setUp()
+      throws IOException {
+    // Use the shared test data
+    categories = IssueTestDataProvider.TEST_CATEGORIES;
+    _defaultErrorCategory = IssueTestDataProvider.TEST_DEFAULT_ERROR_CATEGORY;
+    List<ErrorPatternProfile> errorPatternProfiles = 
IssueTestDataProvider.getSortedPatterns();
+
+    Config testConfig = ConfigFactory.empty();
+    store = new InMemoryErrorPatternStore(testConfig);
+    store.upsertCategory(categories);
+    store.upsertPatterns(errorPatternProfiles);
+    store.setDefaultCategory(_defaultErrorCategory);
+
+    classifier = new ErrorClassifier(store, testConfig);
+  }
+
+  @Test
+  public void testUserCategoryIssues() {
+    List<Issue> issues = IssueTestDataProvider.testUserCategoryIssues();
+    Issue result = classifier.classifyEarlyStopWithDefault(issues);
+    assertNotNull(result);
+    assertTrue(result.getSummary().contains("USER"));
+    assertEquals(result.getSeverity(), IssueSeverity.ERROR);
+    assertTrue(
+        result.getDetails().contains("permission denied") || 
result.getDetails().contains("FileNotFoundException"));
+  }
+
+  @Test
+  public void testSystemInfraCategoryIssues() {
+    List<Issue> issues = IssueTestDataProvider.testSystemInfraCategoryIssues();
+    Issue result = classifier.classifyEarlyStopWithDefault(issues);
+    assertNotNull(result);
+    assertTrue(result.getSummary().contains("SYSTEM INFRA"));
+    assertEquals(result.getSeverity(), IssueSeverity.ERROR);
+    assertTrue(result.getDetails().contains("server too busy") || 
result.getDetails().contains("SafeModeException"));
+  }
+
+  @Test
+  public void testGaasCategoryIssues() {
+    List<Issue> issues = IssueTestDataProvider.testGaasCategoryIssues();
+    Issue result = classifier.classifyEarlyStopWithDefault(issues);
+    assertNotNull(result);
+    assertTrue(result.getSummary().contains("GAAS"));
+    assertEquals(result.getSeverity(), IssueSeverity.ERROR);
+    assertTrue(result.getDetails().contains("TimeoutFailure") || 
result.getDetails().contains("ApplicationFailure"));
+  }
+
+  @Test
+  public void testMiscCategoryIssues() {
+    List<Issue> issues = IssueTestDataProvider.testMiscCategoryIssues();
+    Issue result = classifier.classifyEarlyStopWithDefault(issues);
+    assertNotNull(result);
+    assertTrue(result.getSummary().contains("MISC"));
+    assertEquals(result.getSeverity(), IssueSeverity.ERROR);
+    assertTrue(
+        result.getDetails().contains("NullPointerException") || 
result.getDetails().contains("InterruptedIOException"));
+  }
+
+  @Test
+  public void testNonFatalCategoryIssues() {
+    List<Issue> issues = IssueTestDataProvider.testNonFatalCategoryIssues();
+    Issue result = classifier.classifyEarlyStopWithDefault(issues);
+    assertNotNull(result);
+    assertTrue(result.getSummary().contains("NON FATAL"));
+    assertEquals(result.getSeverity(), IssueSeverity.ERROR);
+    assertTrue(result.getDetails().contains("CopyDataPublisher") || 
result.getDetails()
+        .contains("failed to delete temporary file"));
+  }
+
+  @Test
+  public void testNegativeMatchCases() {
+    List<Issue> issues = IssueTestDataProvider.testNegativeMatchCases();
+    Issue result = classifier.classifyEarlyStopWithDefault(issues);
+    assertNotNull(result);
+    assertTrue(result.getSummary().contains("UNKNOWN"));
+    assertEquals(result.getSeverity(), IssueSeverity.ERROR);
+  }
+
+  @Test
+  public void testEdgeCasesAndBoundaryConditions() {
+    List<Issue> issues = 
IssueTestDataProvider.testEdgeCasesAndBoundaryConditions();
+    Issue result = classifier.classifyEarlyStopWithDefault(issues);
+    assertNotNull(result);
+    assertTrue(result.getSummary().contains("USER"));
+    assertEquals(result.getSeverity(), IssueSeverity.ERROR);
+  }
+
+  @Test
+  public void testNonFatalAndUnknownMix() {
+    List<Issue> issues = IssueTestDataProvider.testNonFatalAndUnknownMix();
+    Issue result = classifier.classifyEarlyStopWithDefault(issues);
+    assertNotNull(result);
+
+    assertTrue(result.getSummary().contains("UNKNOWN"));
+
+  }
+
+  @Test
+  public void testMaximumErrorCountExceeded() {
+    List<Issue> issues = IssueTestDataProvider.testMaximumErrorCountExceeded();
+    Issue result = classifier.classifyEarlyStopWithDefault(issues);
+    assertNotNull(result);
+    assertTrue(result.getSummary().contains("USER"));
+
+    assertTrue(result.getDetails().contains("/path/to/missing/file1"));
+    assertTrue(result.getDetails().contains("/path/to/missing/file9"));
+
+    assertFalse(result.getDetails().contains("/path/to/missing/file11"));
+    assertFalse(result.getDetails().contains("/path/to/missing/file14"));
+
+    assertEquals(result.getSeverity(), IssueSeverity.ERROR);
+  }
+
+  @Test
+  public void testMinimumErrorCountBelowThreshold() {
+    List<Issue> issues = 
IssueTestDataProvider.testMinimumErrorCountBelowThreshold();
+    Issue result = classifier.classifyEarlyStopWithDefault(issues);
+    assertNotNull(result);
+
+    assertTrue(result.getSummary().contains("USER"));
+    assertTrue(result.getDetails().contains("FileNotFoundException"));
+
+    assertFalse(result.getDetails().contains("server too busy"));
+
+    assertEquals(result.getSeverity(), IssueSeverity.ERROR);
+  }
+
+  @Test
+  public void testMixedCategoryIssuesFromLowestToHighestPriority() {
+    List<Issue> issues = 
IssueTestDataProvider.testMixedCategoryIssuesFromLowestToHighestPriority();
+    Issue result = classifier.classifyEarlyStopWithDefault(issues);
+    assertNotNull(result);
+    assertTrue(result.getSummary().contains("USER"));
+    assertEquals(result.getSeverity(), IssueSeverity.ERROR);
+  }
+}
+
+
+
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java
index 616d8de0ba..3901ca1999 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.runtime.troubleshooter;
 
 import org.testng.annotations.Test;
 
+import com.typesafe.config.ConfigFactory;
+
 import org.apache.gobblin.metrics.event.TimingEvent;
 
 import static org.mockito.Mockito.any;
@@ -32,7 +34,7 @@ public class JobIssueEventHandlerTest {
   public void canHandleIssue()
       throws Exception {
     MultiContextIssueRepository issueRepository = 
mock(MultiContextIssueRepository.class);
-    JobIssueEventHandler eventHandler = new 
JobIssueEventHandler(issueRepository, true);
+    JobIssueEventHandler eventHandler = new 
JobIssueEventHandler(issueRepository, true, ConfigFactory.empty());
 
     IssueEventBuilder eventBuilder = new IssueEventBuilder("TestJob");
     eventBuilder.setIssue(getTestIssue("test issue", "code1"));
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index b428962c1b..b78db5b1f7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -35,6 +35,7 @@ import com.typesafe.config.Config;
 import javax.inject.Singleton;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.ErrorClassifier;
 import org.apache.gobblin.restli.EmbeddedRestliServer;
 import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
 import org.apache.gobblin.runtime.instance.StandardGobblinInstanceLauncher;
@@ -93,6 +94,8 @@ import 
org.apache.gobblin.service.monitoring.SpecStoreChangeMonitor;
 import org.apache.gobblin.service.monitoring.SpecStoreChangeMonitorFactory;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.metastore.ErrorPatternStore;
+import org.apache.gobblin.metastore.InMemoryErrorPatternStore;
 
 
 public class GobblinServiceGuiceModule implements Module {
@@ -186,8 +189,12 @@ public class GobblinServiceGuiceModule implements Module {
 
     if (serviceConfig.isJobStatusMonitorEnabled()) {
       
binder.bind(KafkaJobStatusMonitor.class).toProvider(KafkaJobStatusMonitorFactory.class).in(Singleton.class);
+      binder.bind(ErrorClassifier.class);
+      binder.bind(ErrorPatternStore.class)
+          .to(getClassByNameOrAlias(ErrorPatternStore.class, 
serviceConfig.getInnerConfig(),
+              ServiceConfigKeys.ERROR_PATTERN_STORE_CLASS,
+              InMemoryErrorPatternStore.class.getName()));
     }
-
     binder.bind(FlowStatusGenerator.class);
 
     if (serviceConfig.isSchedulerEnabled()) {
@@ -266,4 +273,4 @@ public class GobblinServiceGuiceModule implements Module {
           .build();
     }
   }
-}
+}
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/troubleshooter/MySqlMultiContextIssueRepository.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/troubleshooter/MySqlMultiContextIssueRepository.java
index 3c4c60d934..b6ba738767 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/troubleshooter/MySqlMultiContextIssueRepository.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/troubleshooter/MySqlMultiContextIssueRepository.java
@@ -123,6 +123,53 @@ public class MySqlMultiContextIssueRepository extends 
AbstractIdleService implem
     }
   }
 
+  @Override
+  public List<Issue> getMostRecentErrors(String contextId, int limit)
+      throws TroubleshooterException {
+    Objects.requireNonNull(contextId, "contextId should not be null");
+
+    String querySql = "select code, time, severity, summary, details, 
source_class, exception_class, properties "
+        + "from issues where context_id = ? and severity = 'ERROR' order by 
time desc limit ?";
+
+    try (Connection connection = 
databaseProvider.getDatasource().getConnection();
+        PreparedStatement statement = connection.prepareStatement(querySql)) {
+
+      statement.setString(1, contextId);
+      statement.setLong(2, limit);
+
+      ArrayList<Issue> issues = new ArrayList<>();
+
+      try (ResultSet results = statement.executeQuery()) {
+        while (results.next()) {
+          Issue.IssueBuilder issue = Issue.builder();
+          issue.code(results.getString(1));
+          
issue.time(ZonedDateTime.ofInstant(Instant.ofEpochMilli(results.getTimestamp(2).getTime()),
 ZoneOffset.UTC));
+          issue.severity(IssueSeverity.valueOf(results.getString(3)));
+          issue.summary(results.getString(4));
+          issue.details(results.getString(5));
+          issue.sourceClass(results.getString(6));
+          issue.exceptionClass(results.getString(7));
+
+          String serializedProperties = results.getString(8);
+          if (serializedProperties != null) {
+            Type mapType = new TypeToken<HashMap<String, String>>() {
+            }.getType();
+
+            HashMap<String, String> properties =
+                
GsonUtils.GSON_WITH_DATE_HANDLING.fromJson(serializedProperties, mapType);
+            issue.properties(properties);
+          }
+
+          issues.add(issue.build());
+        }
+      }
+
+      return issues;
+    } catch (SQLException e) {
+      throw new TroubleshooterException("Cannot read issues from the 
database", e);
+    }
+  }
+
   @Override
   public void put(String contextId, Issue issue)
       throws TroubleshooterException {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index 74a864da37..6d567dc5b7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -44,6 +44,7 @@ import 
org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistryFactory;
 import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
 import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
 import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+import org.apache.gobblin.runtime.ErrorClassifier;
 import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
 import org.apache.gobblin.service.ExecutionStatus;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
@@ -66,9 +67,9 @@ public class KafkaAvroJobStatusMonitor extends 
KafkaJobStatusMonitor {
   private Meter messageParseFailures;
 
   public KafkaAvroJobStatusMonitor(String topic, Config config, int 
numThreads, JobIssueEventHandler jobIssueEventHandler,
-      GaaSJobObservabilityEventProducer observabilityEventProducer, 
DagManagementStateStore dagManagementStateStore)
+      GaaSJobObservabilityEventProducer observabilityEventProducer, 
DagManagementStateStore dagManagementStateStore, ErrorClassifier 
errorClassifier)
       throws IOException, ReflectiveOperationException {
-    super(topic, config, numThreads,  jobIssueEventHandler, 
observabilityEventProducer, dagManagementStateStore);
+    super(topic, config, numThreads,  jobIssueEventHandler, 
observabilityEventProducer, dagManagementStateStore, errorClassifier);
 
     if (ConfigUtils.getBoolean(config, 
ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, false)) {
       KafkaAvroSchemaRegistry schemaRegistry = (KafkaAvroSchemaRegistry) new 
KafkaAvroSchemaRegistryFactory().
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index f4a18d4e2f..c2afaa0498 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -46,6 +46,7 @@ import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import javax.inject.Inject;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -58,11 +59,14 @@ import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.ErrorClassifier;
 import org.apache.gobblin.runtime.TaskContext;
 import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
 import org.apache.gobblin.runtime.retention.DatasetCleanerTask;
+import org.apache.gobblin.runtime.troubleshooter.Issue;
 import org.apache.gobblin.runtime.troubleshooter.IssueEventBuilder;
 import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
@@ -89,8 +93,8 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
   public static final String JOB_STATUS_MONITOR_PREFIX = "jobStatusMonitor";
   //We use table suffix that is different from the Gobblin job state store 
suffix of jst to avoid confusion.
   //gst refers to the state store suffix for GaaS-orchestrated Gobblin jobs.
-  public static final String GET_AND_SET_JOB_STATUS = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
-      JOB_STATUS_MONITOR_PREFIX,  "getAndSetJobStatus");
+  public static final String GET_AND_SET_JOB_STATUS =
+      MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
JOB_STATUS_MONITOR_PREFIX, "getAndSetJobStatus");
 
   private static final String PROCESS_JOB_ISSUE = MetricRegistry
       .name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
JOB_STATUS_MONITOR_PREFIX, "jobIssueProcessingTime");
@@ -124,9 +128,12 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
   private final GaaSJobObservabilityEventProducer eventProducer;
   private final DagManagementStateStore dagManagementStateStore;
   private final List<Class<? extends Exception>> nonRetryableExceptions = 
Collections.singletonList(SQLIntegrityConstraintViolationException.class);
+  private final boolean isErrorClassificationEnabled;
+  private final ErrorClassifier errorClassifier;
 
+  @Inject
   public KafkaJobStatusMonitor(String topic, Config config, int numThreads, 
JobIssueEventHandler jobIssueEventHandler,
-      GaaSJobObservabilityEventProducer observabilityEventProducer, 
DagManagementStateStore dagManagementStateStore)
+      GaaSJobObservabilityEventProducer observabilityEventProducer, 
DagManagementStateStore dagManagementStateStore, ErrorClassifier 
errorClassifier)
       throws ReflectiveOperationException {
     super(topic, config.withFallback(DEFAULTS), numThreads);
     String stateStoreFactoryClass = ConfigUtils.getString(config, 
ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, 
FileContextBasedFsStateStoreFactory.class.getName());
@@ -137,11 +144,13 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
 
     this.jobIssueEventHandler = jobIssueEventHandler;
     this.dagManagementStateStore = dagManagementStateStore;
+    this.errorClassifier = errorClassifier;
 
     Config retryerOverridesConfig = 
config.hasPath(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
         ? config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
         : ConfigFactory.empty();
     // log exceptions to expose errors we suffer under and/or guide 
intervention when resolution not readily forthcoming
+
     this.persistJobStatusRetryer =
         
RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG),
 Optional.of(new RetryListener() {
           @Override
@@ -153,7 +162,9 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
             }
           }
         }));
-        this.eventProducer = observabilityEventProducer;
+    this.eventProducer = observabilityEventProducer;
+    this.isErrorClassificationEnabled =
+        ConfigUtils.getBoolean(this.config, 
ServiceConfigKeys.ERROR_CLASSIFICATION_ENABLED_KEY, false);
   }
 
   public enum NewState {
@@ -177,13 +188,13 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
 
   @Override
   public void shutDown() {
-     super.shutDown();
-     this.scheduledExecutorService.shutdown();
-     try {
-       this.scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS);
-     } catch (InterruptedException e) {
-       log.error("Exception encountered when shutting down state store 
cleaner", e);
-     }
+    super.shutDown();
+    this.scheduledExecutorService.shutdown();
+    try {
+      this.scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      log.error("Exception encountered when shutting down state store 
cleaner", e);
+    }
   }
 
   @Override
@@ -234,6 +245,28 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
           boolean retryRequired = modifyStateIfRetryRequired(jobStatus);
 
           if (updatedJobStatus.getRight() == NewState.FINISHED && 
!retryRequired) {
+            if (isErrorClassificationEnabled) {
+              if 
(jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name()))
 {
+                long startTime = System.currentTimeMillis();
+                try {
+                    List<Issue> issues = 
jobIssueEventHandler.getErrorListForClassification(
+                        
TroubleshooterUtils.getContextIdForJob(jobStatus.getProperties()));
+                    Issue finalIssue = 
errorClassifier.classifyEarlyStopWithDefault(issues);
+                    if (finalIssue != null) {
+                      jobIssueEventHandler.logFinalError(finalIssue, flowName, 
flowGroup, String.valueOf(flowExecutionId),
+                          jobName);
+                    }
+                  }
+                  catch (Exception e) {
+                    log.error("Failed to emit issue event for  flowGroup: {}, 
flowName: {}, flowExecutionId: {}, jobName: {}", flowGroup, flowName, 
flowExecutionId, jobName, e);
+                  }
+                long processDuration = System.currentTimeMillis() - startTime;
+                log.info(
+                    "Processing issues for flowGroup: {}, flowName: {}, 
flowExecutionId: {}, jobName: {}, duration: {} ms",
+                    flowGroup, flowName, flowExecutionId, jobName, 
processDuration);
+                }
+              }
+
             // do not send event if retry is required, because it can alert 
users to re-submit a job that is already set to be retried by GaaS
             this.eventProducer.emitObservabilityEvent(jobStatus);
           }
@@ -326,7 +359,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
         // We use three things to accurately count and thereby bound retries, 
even amidst out-of-order events (by skipping late arrivals).
         // The generation is monotonically increasing, while the attempts may 
re-initialize back to 0. this two-part form prevents the composite value from 
ever repeating.
         // And job status reflect the execution status in one attempt
-         if (!isFlowStatusAndPendingResume && (previousStatus != null && 
currentStatus != null && (previousGeneration > currentGeneration || (
+        if (!isFlowStatusAndPendingResume && (previousStatus != null && 
currentStatus != null && (previousGeneration > currentGeneration || (
             previousGeneration == currentGeneration && previousAttempts > 
currentAttempts) || (previousGeneration == currentGeneration && 
previousAttempts == currentAttempts
             && 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus))
             < 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus)))))) 
{
@@ -437,4 +470,4 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
   protected abstract GobblinTrackingEvent 
deserializeEvent(DecodeableKafkaRecord<byte[],byte[]> message);
 
   protected abstract org.apache.gobblin.configuration.State 
parseJobStatus(GobblinTrackingEvent event);
-}
+}
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
index cb7b3d3bad..86ceaecfc7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
@@ -30,6 +30,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
 import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
+import org.apache.gobblin.runtime.ErrorClassifier;
 import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
 import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
@@ -51,15 +52,17 @@ public class KafkaJobStatusMonitorFactory implements 
Provider<KafkaJobStatusMoni
   private final MultiContextIssueRepository issueRepository;
   private final boolean instrumentationEnabled;
   private final DagManagementStateStore dagManagementStateStore;
+  private final ErrorClassifier errorClassifier;
 
   @Inject
   public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler 
jobIssueEventHandler, MultiContextIssueRepository issueRepository,
-      GobblinInstanceEnvironment env, DagManagementStateStore 
dagManagementStateStore) {
+      GobblinInstanceEnvironment env, DagManagementStateStore 
dagManagementStateStore, ErrorClassifier errorClassifier) {
     this.config = Objects.requireNonNull(config);
     this.jobIssueEventHandler = Objects.requireNonNull(jobIssueEventHandler);
     this.issueRepository = issueRepository;
     this.instrumentationEnabled = env.isInstrumentationEnabled();
     this.dagManagementStateStore = dagManagementStateStore;
+    this.errorClassifier = errorClassifier;
   }
 
   private KafkaJobStatusMonitor createJobStatusMonitor()
@@ -94,7 +97,7 @@ public class KafkaJobStatusMonitorFactory implements 
Provider<KafkaJobStatusMoni
 
     return (KafkaJobStatusMonitor) GobblinConstructorUtils
         .invokeLongestConstructor(jobStatusMonitorClass, topic, 
jobStatusConfig, numThreads, jobIssueEventHandler, observabilityEventProducer,
-            dagManagementStateStore);
+            dagManagementStateStore, errorClassifier);
   }
 
   @Override
@@ -105,4 +108,4 @@ public class KafkaJobStatusMonitorFactory implements 
Provider<KafkaJobStatusMoni
       throw new RuntimeException(e);
     }
   }
-}
+}
\ No newline at end of file

Reply via email to