This is an automated email from the ASF dual-hosted git repository.
abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 51773abba1 [GOBBLIN-2211] Implement Error Classification based on
execution issues (#4121)
51773abba1 is described below
commit 51773abba115e1bebec413662063286789c712db
Author: NamsB7 <[email protected]>
AuthorDate: Fri Jul 25 15:38:00 2025 +0530
[GOBBLIN-2211] Implement Error Classification based on execution issues
(#4121)
Implemented Error Classification for jobs based on execution issues
Added 2 new tables for ErrorPatternStore along with its in memory
implementation
---
.../gobblin/configuration/ConfigurationKeys.java | 8 +
.../gobblin/configuration/ErrorCategory.java | 58 +--
.../gobblin/configuration/ErrorPatternProfile.java | 55 +-
.../apache/gobblin/service/ServiceConfigKeys.java | 10 +
.../gobblin/metastore/ErrorPatternStore.java | 63 +++
.../metastore/InMemoryErrorPatternStore.java | 180 +++++++
.../gobblin/metastore/MysqlErrorPatternStore.java | 370 ++++++++++++++
.../runtime/KafkaAvroJobStatusMonitorTest.java | 138 ++++-
.../apache/gobblin/runtime/ErrorClassifier.java | 251 +++++++++
.../troubleshooter/InMemoryIssueRepository.java | 11 +-
.../InMemoryMultiContextIssueRepository.java | 20 +-
.../runtime/troubleshooter/IssueRepository.java | 3 +
.../troubleshooter/IssueTestDataProvider.java | 564 +++++++++++++++++++++
.../troubleshooter/JobIssueEventHandler.java | 38 +-
.../MultiContextIssueRepository.java | 6 +
.../troubleshooter/NoopIssueRepository.java | 6 +
.../gobblin/runtime/ErrorClassifierTest.java | 184 +++++++
.../troubleshooter/JobIssueEventHandlerTest.java | 4 +-
.../modules/core/GobblinServiceGuiceModule.java | 11 +-
.../MySqlMultiContextIssueRepository.java | 47 ++
.../monitoring/KafkaAvroJobStatusMonitor.java | 5 +-
.../service/monitoring/KafkaJobStatusMonitor.java | 59 ++-
.../monitoring/KafkaJobStatusMonitorFactory.java | 9 +-
23 files changed, 1983 insertions(+), 117 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 03f1a2629e..195ca295ee 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -85,6 +85,14 @@ public class ConfigurationKeys {
public static final String DATASETURN_STATESTORE_NAME_PARSER =
"state.store.datasetUrnStateStoreNameParser";
+ /**
+ * Error Pattern Store related to configuration properties.
+ */
+ public static final String ERROR_REGEX_DB_TABLE_KEY =
"state.store.db.regexTableName";
+ public static final String ERROR_CATEGORIES_DB_TABLE_KEY =
"state.store.db.categoriesTableName";
+ public static final String ERROR_REGEX_VARCHAR_SIZE_KEY =
"error.regex.max.varchar.size";
+ public static final String ERROR_CATEGORY_VARCHAR_SIZE_KEY =
"error.category.max.varchar.size";
+ public static final String DEFAULT_DB_ERROR_CATEGORY_KEY =
"error.category.default";
/**
* Job scheduler configuration properties.
*/
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/NoopIssueRepository.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ErrorCategory.java
similarity index 50%
copy from
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/NoopIssueRepository.java
copy to
gobblin-api/src/main/java/org/apache/gobblin/configuration/ErrorCategory.java
index f47f9085fb..43b927242c 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/NoopIssueRepository.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ErrorCategory.java
@@ -15,47 +15,27 @@
* limitations under the License.
*/
-package org.apache.gobblin.runtime.troubleshooter;
+package org.apache.gobblin.configuration;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-
-public class NoopIssueRepository implements IssueRepository {
- @Override
- public List<Issue> getAll()
- throws TroubleshooterException {
- return Collections.emptyList();
- }
-
- @Override
- public void put(Issue issue)
- throws TroubleshooterException {
-
- }
-
- @Override
- public void put(Collection<Issue> issues)
- throws TroubleshooterException {
-
- }
-
- @Override
- public void remove(String issueCode)
- throws TroubleshooterException {
-
- }
-
- @Override
- public void removeAll()
- throws TroubleshooterException {
+import java.io.Serializable;
+import lombok.Getter;
+import lombok.Setter;
+/*
+ * Represents an error category used to classify errors within the Gobblin
framework.
+ * Each category has a name and a priority, which can be used to determine the
severity
+ * or importance of the error. This class is primarily used for error
categorisation and handling.
+ */
+@Getter
+@Setter
+public class ErrorCategory implements Serializable {
+ private String categoryName;
+ private int priority;
+
+ public ErrorCategory(String categoryName, int priority) {
+ this.categoryName = categoryName;
+ this.priority = priority;
}
- @Override
- public void replaceAll(Collection<Issue> issues)
- throws TroubleshooterException {
-
- }
}
+
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/NoopIssueRepository.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ErrorPatternProfile.java
similarity index 50%
copy from
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/NoopIssueRepository.java
copy to
gobblin-api/src/main/java/org/apache/gobblin/configuration/ErrorPatternProfile.java
index f47f9085fb..0b649b4ebf 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/NoopIssueRepository.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ErrorPatternProfile.java
@@ -15,47 +15,24 @@
* limitations under the License.
*/
-package org.apache.gobblin.runtime.troubleshooter;
+package org.apache.gobblin.configuration;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import java.io.Serializable;
+import lombok.Getter;
+import lombok.Setter;
-public class NoopIssueRepository implements IssueRepository {
- @Override
- public List<Issue> getAll()
- throws TroubleshooterException {
- return Collections.emptyList();
- }
-
- @Override
- public void put(Issue issue)
- throws TroubleshooterException {
-
- }
-
- @Override
- public void put(Collection<Issue> issues)
- throws TroubleshooterException {
-
- }
-
- @Override
- public void remove(String issueCode)
- throws TroubleshooterException {
-
- }
-
- @Override
- public void removeAll()
- throws TroubleshooterException {
-
- }
-
- @Override
- public void replaceAll(Collection<Issue> issues)
- throws TroubleshooterException {
-
+/*
+ * Represents a profile for error patterns, containing a regex to match error
descriptions and a category name for classification.
+ */
+@Getter
+@Setter
+public class ErrorPatternProfile implements Serializable {
+ private String descriptionRegex;
+ private String categoryName;
+
+ public ErrorPatternProfile(String descriptionRegex, String categoryName) {
+ this.descriptionRegex = descriptionRegex;
+ this.categoryName = categoryName;
}
}
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 8b6c1df037..f18566c22f 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -160,4 +160,14 @@ public class ServiceConfigKeys {
public static final String DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "nonRetryableExceptions";
public static final Integer DEFAULT_NUM_DAG_PROC_THREADS = 3;
public static final long DEFAULT_FLOW_FINISH_DEADLINE_MILLIS =
TimeUnit.HOURS.toMillis(24);
+
+ public static final String ERROR_PATTERN_STORE_CLASS =
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "errorPatternStore.class";
+ public static final String ERROR_CLASSIFICATION_ENABLED_KEY =
"errorClassification.enabled";
+ public static final String ERROR_CLASSIFICATION_MAX_ERRORS_IN_FINAL_KEY =
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX
+"errorClassification.maxErrorsInFinal";
+ public static final int DEFAULT_ERROR_CLASSIFICATION_MAX_ERRORS_IN_FINAL =
10;
+ public static final String ERROR_CLASSIFICATION_MAX_ERRORS_TO_PROCESS_KEY =
GOBBLIN_SERVICE_PREFIX + "errorClassification.maxErrorsToProcess";
+ public static final int DEFAULT_ERROR_CLASSIFICATION_MAX_ERRORS_TO_PROCESS =
1000;
+ public static final String ERROR_CLASSIFICATION_DEFAULT_PRIORITY_KEY =
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX +
"errorClassification.defaultPriorityValue";
+ public static final int DEFAULT_PRIORITY_VALUE = Integer.MAX_VALUE;
+ public static final String ERROR_PATTERN_STORE_DEFAULT_CATEGORY_KEY =
GOBBLIN_SERVICE_PREFIX + "errorPatternStore.defaultCategory";
}
diff --git
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/ErrorPatternStore.java
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/ErrorPatternStore.java
new file mode 100644
index 0000000000..95e8fcdbda
--- /dev/null
+++
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/ErrorPatternStore.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import org.apache.gobblin.configuration.ErrorCategory;
+import org.apache.gobblin.configuration.ErrorPatternProfile;
+
+import java.io.IOException;
+import java.util.List;
+
+
+/**
+ * Interface for a store that persists Errors and Categories.
+ **/
+public interface ErrorPatternStore {
+ void addErrorPattern(ErrorPatternProfile issue)
+ throws IOException;
+
+ boolean deleteErrorPattern(String descriptionRegex)
+ throws IOException;
+
+ ErrorPatternProfile getErrorPattern(String descriptionRegex)
+ throws IOException;
+
+ List<ErrorPatternProfile> getAllErrorPatterns()
+ throws IOException;
+
+ List<ErrorPatternProfile> getErrorPatternsByCategory(String categoryName)
+ throws IOException;
+
+ void addErrorCategory(ErrorCategory errorCategory)
+ throws IOException;
+
+ ErrorCategory getErrorCategory(String categoryName)
+ throws IOException;
+
+ int getErrorCategoryPriority(String categoryName)
+ throws IOException;
+
+ List<ErrorCategory> getAllErrorCategories()
+ throws IOException;
+
+ List<ErrorPatternProfile> getAllErrorPatternsOrderedByCategoryPriority()
+ throws IOException;
+
+ ErrorCategory getDefaultCategory()
+ throws IOException;
+}
diff --git
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/InMemoryErrorPatternStore.java
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/InMemoryErrorPatternStore.java
new file mode 100644
index 0000000000..9be6070e26
--- /dev/null
+++
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/InMemoryErrorPatternStore.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ErrorCategory;
+import org.apache.gobblin.configuration.ErrorPatternProfile;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An in-memory implementation of the ErrorPatternStore interface.
+ * This class serves as a (default) base class for initialisation and does not
persist data across application restarts.
+ **/
+@Slf4j
+public class InMemoryErrorPatternStore implements ErrorPatternStore {
+ private final List<ErrorPatternProfile> errorPatterns = new ArrayList<>();
+ private final Map<String, ErrorCategory> categories = new HashMap<>();
+ private ErrorCategory defaultErrorCategory = null;
+
+ private static final String DEFAULT_CATEGORY_NAME = "UNKNOWN";
+
+ private final int defaultPriority;
+
+ @Inject
+ public InMemoryErrorPatternStore(Config config) {
+ ErrorCategory user = new ErrorCategory("USER", 1);
+ this.categories.put(user.getCategoryName(), user);
+
+ this.errorPatterns.add(new ErrorPatternProfile(".*file not found.*",
"USER"));
+ defaultPriority = ConfigUtils.getInt(config,
ServiceConfigKeys.ERROR_CLASSIFICATION_DEFAULT_PRIORITY_KEY,
+ ServiceConfigKeys.DEFAULT_PRIORITY_VALUE);
+
+ String defaultErrorCategoryName =ConfigUtils.getString(config,
ServiceConfigKeys.ERROR_PATTERN_STORE_DEFAULT_CATEGORY_KEY,
DEFAULT_CATEGORY_NAME);
+
+ this.defaultErrorCategory = new ErrorCategory(defaultErrorCategoryName,
defaultPriority);
+ }
+
+ public void upsertCategory(List<ErrorCategory> categories) {
+ for (ErrorCategory errorCategory : categories) {
+ this.categories.put(errorCategory.getCategoryName(), errorCategory);
+ }
+ }
+
+ public void upsertPatterns(List<ErrorPatternProfile> patterns) {
+ // Clear existing patterns and add all new ones
+ this.errorPatterns.clear();
+ this.errorPatterns.addAll(patterns);
+ }
+
+ public void setDefaultCategory(ErrorCategory errorCategory) {
+ this.defaultErrorCategory = errorCategory;
+ }
+
+ @Override
+ public void addErrorPattern(ErrorPatternProfile issue)
+ throws IOException {
+ errorPatterns.add(issue);
+ }
+
+ @Override
+ public boolean deleteErrorPattern(String descriptionRegex)
+ throws IOException {
+ return errorPatterns.removeIf(issue ->
issue.getDescriptionRegex().equals(descriptionRegex));
+ }
+
+ @Override
+ public ErrorPatternProfile getErrorPattern(String descriptionRegex)
+ throws IOException {
+ for (ErrorPatternProfile issue : errorPatterns) {
+ if (issue.getDescriptionRegex().equals(descriptionRegex)) {
+ return issue;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public List<ErrorPatternProfile> getAllErrorPatterns()
+ throws IOException {
+ return new ArrayList<>(errorPatterns);
+ }
+
+ @Override
+ public List<ErrorPatternProfile> getErrorPatternsByCategory(String
categoryName)
+ throws IOException {
+ List<ErrorPatternProfile> result = new ArrayList<>();
+ for (ErrorPatternProfile issue : errorPatterns) {
+ if (issue.getCategoryName() != null &&
issue.getCategoryName().equals(categoryName)) {
+ result.add(issue);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void addErrorCategory(ErrorCategory errorCategory)
+ throws IOException {
+ if (errorCategory != null) {
+ categories.put(errorCategory.getCategoryName(), errorCategory);
+ }
+ }
+
+ @Override
+ public ErrorCategory getErrorCategory(String categoryName)
+ throws IOException {
+ return categories.get(categoryName);
+ }
+
+ @Override
+ public int getErrorCategoryPriority(String categoryName)
+ throws IOException {
+ ErrorCategory errorCategory = getErrorCategory(categoryName);
+ if (errorCategory != null) {
+ return errorCategory.getPriority();
+ }
+ throw new IOException("ErrorCategory not found: " + categoryName);
+ }
+
+ @Override
+ public List<ErrorCategory> getAllErrorCategories()
+ throws IOException {
+ return new ArrayList<>(categories.values());
+ }
+
+ @Override
+ public ErrorCategory getDefaultCategory()
+ throws IOException {
+ if (defaultErrorCategory == null) {
+ defaultErrorCategory = new ErrorCategory(DEFAULT_CATEGORY_NAME,
defaultPriority);
+ }
+ return defaultErrorCategory;
+ }
+
+ @Override
+ public List<ErrorPatternProfile>
getAllErrorPatternsOrderedByCategoryPriority()
+ throws IOException {
+ errorPatterns.sort((issue1, issue2) -> {
+ ErrorCategory cat1 = categories.get(issue1.getCategoryName());
+ ErrorCategory cat2 = categories.get(issue2.getCategoryName());
+ if (cat1 == null && cat2 == null) {
+ return 0;
+ }
+ if (cat1 == null) {
+ return 1;
+ }
+ if (cat2 == null) {
+ return -1;
+ }
+ return Integer.compare(cat1.getPriority(), cat2.getPriority());
+ });
+ return errorPatterns;
+ }
+}
diff --git
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlErrorPatternStore.java
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlErrorPatternStore.java
new file mode 100644
index 0000000000..e63d419d2c
--- /dev/null
+++
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlErrorPatternStore.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.ErrorPatternProfile;
+import org.apache.gobblin.configuration.ErrorCategory;
+
+import com.typesafe.config.Config;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.gobblin.util.ConfigUtils;
+
+import javax.sql.DataSource;
+
+import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * MySQL-backed implementation of ErrorPatternStore.
+ *
+ * This class provides methods to primarily retrieve error regex patterns and
error categories.
+ * There are also methods to add and delete, which should be used with
caution, and retrieve error patterns and categories.
+ *
+ * Expected table schemas:
+ *
+ * 1. error_summary_regex_store
+ * - description_regex: VARCHAR(255) NOT NULL UNIQUE
+ * - error_category_name: VARCHAR(255) NOT NULL
+ *
+ * 2. error_categories
+ * - error_category_name: VARCHAR(255) PRIMARY KEY
+ * - priority: INT UNIQUE NOT NULL
+ *
+ * Default Category Configuration:
+ * - The default category can be configured via the property:
gobblin.service.errorPatternStore.defaultCategory
+ * - The configured value must exactly match an existing error_category_name
in the error_categories table
+ * - If no default is configured, the method returns null (no automatic
fallback)
+ * - If a configured default category is not found in the database, an
IOException will be thrown
+ *
+ **/
+@Slf4j
+public class MysqlErrorPatternStore implements ErrorPatternStore {
+ private final DataSource dataSource;
+ private final String errorRegexSummaryStoreTable;
+ private final String errorCategoriesTable;
+ public static final String CONFIG_PREFIX = "MysqlErrorPatternStore";
+
+ private static final int DEFAULT_MAX_CHARACTERS_IN_SQL_DESCRIPTION_REGEX =
300;
+ private static final int DEFAULT_MAX_CHARACTERS_IN_SQL_CATEGORY_NAME = 255;
+ private final int maxCharactersInSqlDescriptionRegex;
+ private final int maxCharactersInSqlCategoryName;
+
+ private static final String CREATE_ERROR_REGEX_SUMMARY_STORE_TABLE_STATEMENT
=
+ "CREATE TABLE IF NOT EXISTS %s (description_regex VARCHAR(%d) NOT NULL,
error_category_name VARCHAR(%d) NOT NULL)";
+
+ private static final String CREATE_ERROR_CATEGORIES_TABLE_NAME =
+ "CREATE TABLE IF NOT EXISTS %s (error_category_name VARCHAR(%d) PRIMARY
KEY, priority INT UNIQUE NOT NULL)";
+
+ private static final String INSERT_ERROR_CATEGORY_STATEMENT = "INSERT INTO
%s (error_category_name, priority) "
+ + "VALUES (?, ?) ON DUPLICATE KEY UPDATE priority=VALUES(priority)";
+
+ private static final String GET_ERROR_CATEGORY_STATEMENT =
+ "SELECT error_category_name, priority FROM %s WHERE error_category_name
= ?";
+
+ private static final String GET_ALL_ERROR_CATEGORIES_STATEMENT =
+ "SELECT error_category_name, priority FROM %s ORDER BY priority ASC";
+
+ private static final String INSERT_ERROR_REGEX_SUMMARY_STATEMENT =
+ "INSERT INTO %s (description_regex, error_category_name) "
+ + "VALUES (?, ?) ON DUPLICATE KEY UPDATE
error_category_name=VALUES(error_category_name)";
+
+ private static final String DELETE_ERROR_REGEX_SUMMARY_STATEMENT = "DELETE
FROM %s WHERE description_regex = ?";
+
+ private static final String GET_ERROR_REGEX_SUMMARY_STATEMENT =
+ "SELECT description_regex, error_category_name FROM %s WHERE
description_regex = ?";
+
+ private static final String GET_ERROR_PATTERN_BY_CATEGORY_STATEMENT =
"SELECT description_regex, error_category_name FROM %s "
+ + " WHERE error_category_name = ?";
+
+ private static final String GET_ALL_ERROR_REGEX_SUMMARIES_STATEMENT =
+ "SELECT description_regex, error_category_name FROM %s";
+
+ // This SQL statement retrieves the category with the lowest priority
(highest priority value).
+ private static final String GET_LOWEST_PRIORITY_ERROR_CATEGORY_STATEMENT =
+ "SELECT error_category_name, priority FROM %s ORDER BY priority DESC
LIMIT 1";
+
+ private static final String
GET_ALL_ERROR_ISSUES_ORDERED_BY_CATEGORY_PRIORITY_STATEMENT =
+ "SELECT e.description_regex, e.error_category_name FROM %s e "
+ + "JOIN %s c ON e.error_category_name = c.error_category_name "
+ + "ORDER BY c.priority ASC";
+
+ private final String configuredDefaultCategoryName;
+
+ @Inject
+ public MysqlErrorPatternStore(Config config)
+ throws IOException {
+ if (config.hasPath(CONFIG_PREFIX)) {
+ config = config.getConfig(CONFIG_PREFIX).withFallback(config);
+ } else {
+ throw new IOException("Please specify the config for
MysqlErrorPatternStore");
+ }
+ this.errorRegexSummaryStoreTable =
+ ConfigUtils.getString(config,
ConfigurationKeys.ERROR_REGEX_DB_TABLE_KEY, "error_summary_regex_store");
+ this.errorCategoriesTable =
+ ConfigUtils.getString(config,
ConfigurationKeys.ERROR_CATEGORIES_DB_TABLE_KEY, "error_categories");
+ this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
+
+ this.maxCharactersInSqlDescriptionRegex = ConfigUtils.getInt(config,
ConfigurationKeys.ERROR_REGEX_VARCHAR_SIZE_KEY,
+ DEFAULT_MAX_CHARACTERS_IN_SQL_DESCRIPTION_REGEX);
+
+ this.maxCharactersInSqlCategoryName = ConfigUtils.getInt(config,
ConfigurationKeys.ERROR_CATEGORY_VARCHAR_SIZE_KEY,
+ DEFAULT_MAX_CHARACTERS_IN_SQL_CATEGORY_NAME);
+
+ // Use ServiceConfigKeys for the default category config key
+ this.configuredDefaultCategoryName =
+ ConfigUtils.getString(config,
ConfigurationKeys.DEFAULT_DB_ERROR_CATEGORY_KEY, null);
+
+ createTablesIfNotExist();
+ }
+
+ private void createTablesIfNotExist()
+ throws IOException {
+ try (Connection connection = dataSource.getConnection()) {
+ try (PreparedStatement createRegexTable = connection.prepareStatement(
+ String.format(CREATE_ERROR_REGEX_SUMMARY_STORE_TABLE_STATEMENT,
errorRegexSummaryStoreTable,
+ maxCharactersInSqlDescriptionRegex,
maxCharactersInSqlCategoryName))) {
+ createRegexTable.executeUpdate();
+ }
+
+ // Create error_categories table
+ try (PreparedStatement createCategoriesTable =
connection.prepareStatement(
+ String.format(CREATE_ERROR_CATEGORIES_TABLE_NAME,
errorCategoriesTable, maxCharactersInSqlCategoryName))) {
+ createCategoriesTable.executeUpdate();
+ }
+
+ // Commit both changes together
+ connection.commit();
+ } catch (SQLException e) {
+ throw new IOException("Failed to create tables for storing
ErrorPatterns", e);
+ }
+ }
+
+ @Override
+ public void addErrorCategory(ErrorCategory errorCategory)
+ throws IOException {
+ String sql = String.format(INSERT_ERROR_CATEGORY_STATEMENT,
errorCategoriesTable);
+ try (Connection conn = dataSource.getConnection(); PreparedStatement ps =
conn.prepareStatement(sql)) {
+ ps.setString(1, errorCategory.getCategoryName());
+ ps.setInt(2, errorCategory.getPriority());
+ ps.executeUpdate();
+ conn.commit();
+ } catch (SQLException e) {
+ throw new IOException("Failed to add errorCategory", e);
+ }
+ }
+
+ @Override
+ public ErrorCategory getErrorCategory(String categoryName)
+ throws IOException {
+ try (Connection conn = dataSource.getConnection(); PreparedStatement ps =
conn.prepareStatement(
+ String.format(GET_ERROR_CATEGORY_STATEMENT, errorCategoriesTable))) {
+ ps.setString(1, categoryName);
+ try (ResultSet rs = ps.executeQuery()) {
+ if (rs.next()) {
+ return new ErrorCategory(rs.getString(1), rs.getInt(2));
+ }
+ }
+ } catch (SQLException e) {
+ throw new IOException("Failed to get category", e);
+ }
+ return null;
+ }
+
+ @Override
+ public int getErrorCategoryPriority(String categoryName)
+ throws IOException {
+ ErrorCategory cat = getErrorCategory(categoryName);
+ if (cat == null) {
+ throw new IOException("ErrorCategory not found: " + categoryName);
+ }
+ return cat.getPriority();
+ }
+
+ @Override
+ public List<ErrorCategory> getAllErrorCategories()
+ throws IOException {
+ List<ErrorCategory> categories = new ArrayList<>();
+ try (Connection conn = dataSource.getConnection(); PreparedStatement ps =
conn.prepareStatement(
+ String.format(GET_ALL_ERROR_CATEGORIES_STATEMENT,
errorCategoriesTable)); ResultSet rs = ps.executeQuery()) {
+ while (rs.next()) {
+ categories.add(new ErrorCategory(rs.getString(1), rs.getInt(2)));
+ }
+ } catch (SQLException e) {
+ throw new IOException("Failed to get all categories", e);
+ }
+ return categories;
+ }
+
+ @Override
+ public void addErrorPattern(ErrorPatternProfile issue)
+ throws IOException {
+ try (Connection conn = dataSource.getConnection(); PreparedStatement ps =
conn.prepareStatement(
+ String.format(INSERT_ERROR_REGEX_SUMMARY_STATEMENT,
errorRegexSummaryStoreTable))) {
+ ps.setString(1, issue.getDescriptionRegex());
+ ps.setString(2, issue.getCategoryName());
+ ps.executeUpdate();
+ conn.commit();
+ } catch (SQLException e) {
+ throw new IOException("Failed to add issue", e);
+ }
+ }
+
+ @Override
+ public boolean deleteErrorPattern(String descriptionRegex)
+ throws IOException {
+ try (Connection conn = dataSource.getConnection(); PreparedStatement ps =
conn.prepareStatement(
+ String.format(DELETE_ERROR_REGEX_SUMMARY_STATEMENT,
errorRegexSummaryStoreTable))) {
+ ps.setString(1, descriptionRegex);
+ int rows = ps.executeUpdate();
+ conn.commit();
+ return rows > 0;
+ } catch (SQLException e) {
+ throw new IOException("Failed to delete issue", e);
+ }
+ }
+
+ @Override
+ public ErrorPatternProfile getErrorPattern(String descriptionRegex)
+ throws IOException {
+ try (Connection conn = dataSource.getConnection(); PreparedStatement ps =
conn.prepareStatement(
+ String.format(GET_ERROR_REGEX_SUMMARY_STATEMENT,
errorRegexSummaryStoreTable))) {
+ ps.setString(1, descriptionRegex);
+ try (ResultSet rs = ps.executeQuery()) {
+ if (rs.next()) {
+ return new ErrorPatternProfile(rs.getString(1), rs.getString(2));
+ }
+ }
+ } catch (SQLException e) {
+ throw new IOException("Failed to get issue", e);
+ }
+ return null;
+ }
+
+ @Override
+ public List<ErrorPatternProfile> getAllErrorPatterns()
+ throws IOException {
+ List<ErrorPatternProfile> issues = new ArrayList<>();
+ try (Connection conn = dataSource.getConnection(); PreparedStatement ps =
conn.prepareStatement(
+ String.format(GET_ALL_ERROR_REGEX_SUMMARIES_STATEMENT,
errorRegexSummaryStoreTable));
+ ResultSet rs = ps.executeQuery()) {
+ while (rs.next()) {
+ issues.add(new ErrorPatternProfile(rs.getString(1), rs.getString(2)));
+ }
+ } catch (SQLException e) {
+ throw new IOException("Failed to get all issues", e);
+ }
+ return issues;
+ }
+
+ @Override
+ public List<ErrorPatternProfile> getErrorPatternsByCategory(String
categoryName)
+ throws IOException {
+ List<ErrorPatternProfile> issues = new ArrayList<>();
+ try (Connection conn = dataSource.getConnection(); PreparedStatement ps =
conn.prepareStatement(
+ String.format(GET_ERROR_PATTERN_BY_CATEGORY_STATEMENT,
errorRegexSummaryStoreTable)
+ )) {
+ ps.setString(1, categoryName);
+ try (ResultSet rs = ps.executeQuery()) {
+ while (rs.next()) {
+ issues.add(new ErrorPatternProfile(rs.getString(1),
rs.getString(2)));
+ }
+ }
+ } catch (SQLException e) {
+ throw new IOException("Failed to get issues by category", e);
+ }
+ return issues;
+ }
+
+ @Override
+ public ErrorCategory getDefaultCategory()
+ throws IOException {
+ // 1. Try to use the configured default category name if set
+ if (StringUtils.isNotBlank(configuredDefaultCategoryName)) {
+ ErrorCategory cat = getErrorCategory(configuredDefaultCategoryName);
+ if (cat != null) {
+ return cat;
+ } else {
+ // Throw exception if configured category doesn't exist
+ throw new IOException(String.format(
+ "Configured default category '%s' not found in database",
+ configuredDefaultCategoryName));
+ }
+ }
+
+ // 2. No configuration provided - use lowest priority category as fallback
+ ErrorCategory lowestPriorityCategory = getLowestPriorityCategory();
+ if (lowestPriorityCategory == null) {
+ throw new IOException("No error categories found in database");
+ }
+
+ log.info("No default category configured, using lowest priority category:
{}",
+ lowestPriorityCategory.getCategoryName());
+ return lowestPriorityCategory;
+ }
+
+ /**
+ * Returns the category with the lowest priority, i.e. the highest priority
value (descending order).
+ */
+ private ErrorCategory getLowestPriorityCategory()
+ throws IOException {
+ try (Connection conn = dataSource.getConnection(); PreparedStatement ps =
conn.prepareStatement(
+ String.format(GET_LOWEST_PRIORITY_ERROR_CATEGORY_STATEMENT,
errorCategoriesTable))) {
+ try (ResultSet rs = ps.executeQuery()) {
+ if (rs.next()) {
+ return new ErrorCategory(rs.getString(1), rs.getInt(2));
+ }
+ }
+ } catch (SQLException e) {
+ throw new IOException("Failed to get category", e);
+ }
+ return null;
+ }
+
+ /**
+ * Returns all ErrorIssues ordered by the priority of their category
(ascending), then by description_regex.
+ */
+ @Override
+ public List<ErrorPatternProfile>
getAllErrorPatternsOrderedByCategoryPriority()
+ throws IOException {
+ List<ErrorPatternProfile> issues = new ArrayList<>();
+ String sql =
String.format(GET_ALL_ERROR_ISSUES_ORDERED_BY_CATEGORY_PRIORITY_STATEMENT,
errorRegexSummaryStoreTable,
+ errorCategoriesTable);
+ try (Connection conn = dataSource.getConnection(); PreparedStatement ps =
conn.prepareStatement(sql);
+ ResultSet rs = ps.executeQuery()) {
+ while (rs.next()) {
+ issues.add(new ErrorPatternProfile(rs.getString(1), rs.getString(2)));
+ }
+ } catch (SQLException e) {
+ throw new IOException("Failed to get all issues ordered by category
priority", e);
+ }
+ return issues;
+ }
+}
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index 06fb1f9dbc..ebdbbb73ca 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -34,10 +34,12 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.mockito.Mockito;
+import static org.mockito.ArgumentMatchers.any;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.base.Optional;
@@ -55,10 +57,12 @@ import kafka.message.MessageAndMetadata;
import lombok.Getter;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.ErrorPatternProfile;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.kafka.KafkaTestBase;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
+import org.apache.gobblin.metastore.InMemoryErrorPatternStore;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.FlowStatus;
import org.apache.gobblin.metrics.GaaSFlowObservabilityEvent;
@@ -72,9 +76,12 @@ import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
import org.apache.gobblin.metrics.kafka.KafkaKeyValueProducerPusher;
import org.apache.gobblin.metrics.kafka.Pusher;
import
org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.IssueTestDataProvider;
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.monitoring.GaaSJobObservabilityEventProducer;
@@ -86,7 +93,6 @@ import
org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProduc
import org.apache.gobblin.util.ConfigUtils;
import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_MULTIPLIER;
-import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@@ -105,6 +111,8 @@ public class KafkaAvroJobStatusMonitorTest {
private MetricContext context;
private KafkaAvroEventKeyValueReporter.Builder<?> builder;
DagActionStore.DagAction enforceJobStartDeadlineDagAction;
+ private JobIssueEventHandler eventHandler;
+ private ErrorClassifier errorClassifier;
@BeforeClass
public void setUp() throws Exception {
@@ -128,6 +136,12 @@ public class KafkaAvroJobStatusMonitorTest {
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
}
+ @BeforeMethod
+ public void setUpMethod() {
+ eventHandler = Mockito.mock(JobIssueEventHandler.class);
+ errorClassifier = Mockito.mock(ErrorClassifier.class);
+ }
+
@Test
public void testProcessMessageForSuccessfulFlow() throws IOException,
ReflectiveOperationException {
DagManagementStateStore dagManagementStateStore =
mock(DagManagementStateStore.class);
@@ -152,7 +166,7 @@ public class KafkaAvroJobStatusMonitorTest {
Thread.currentThread().interrupt();
}
MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
- new NoopGaaSJobObservabilityEventProducer(), dagManagementStateStore);
+ new NoopGaaSJobObservabilityEventProducer(), dagManagementStateStore);
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator =
Iterators.transform(
@@ -194,6 +208,115 @@ public class KafkaAvroJobStatusMonitorTest {
jobStatusMonitor.shutDown();
}
+ /**
+ * Functional test for ErrorClassifier integration with
KafkaJobStatusMonitor:
+ * Tests that when a job fails, the monitor properly invokes ErrorClassifier
to classify issues
+ * and logs the final classified error through the JobIssueEventHandler.
+ */
+ @Test (dependsOnMethods = "testObservabilityEventFlowFailed")
+ public void testErrorClassifierFinalIssueSystemInfra()
+ throws IOException, ReflectiveOperationException,
TroubleshooterException {
+ DagManagementStateStore dagManagementStateStore =
mock(DagManagementStateStore.class);
+ Config config =
ConfigFactory.empty().withValue(ServiceConfigKeys.ERROR_CLASSIFICATION_ENABLED_KEY,
ConfigValueFactory.fromAnyRef(Boolean.TRUE));
+ List<ErrorPatternProfile> errorPatternProfiles =
IssueTestDataProvider.getSortedPatterns();
+
+ InMemoryErrorPatternStore store = new InMemoryErrorPatternStore(config);
+ store.upsertCategory(IssueTestDataProvider.TEST_CATEGORIES);
+ store.upsertPatterns(errorPatternProfiles);
+
store.setDefaultCategory(IssueTestDataProvider.TEST_DEFAULT_ERROR_CATEGORY);
+
+ this.errorClassifier = new ErrorClassifier(store, config);
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic2");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createJobFailedEvent()
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+
+ //Set issues for error classification
+
Mockito.doReturn(IssueTestDataProvider.testSystemInfraCategoryIssues()).when(eventHandler).getErrorListForClassification(Mockito.any());
+
+ try {
+ Thread.sleep(1000L);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
+ config, new NoopGaaSJobObservabilityEventProducer(),
dagManagementStateStore);
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+
+ ConsumerIterator<byte[], byte[]> iterator =
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
+ MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
+
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
+
+ // Verify that the JobIssueEventHandler was called with the final
classified error
+ Mockito.verify(eventHandler, Mockito.times(1))
+ .logFinalError(Mockito.argThat(issue ->
+ issue.getSummary().contains("SYSTEM INFRA")),
+ Mockito.eq(this.flowName),
+ Mockito.eq(this.flowGroup),
+ Mockito.eq(String.valueOf(this.flowExecutionId)),
+ Mockito.eq(this.jobName));
+
+ jobStatusMonitor.shutDown();
+ }
+
+ /**
+ * Test for verifying that error classifier module is not called when it is
disabled.
+ */
+ @Test (dependsOnMethods = "testErrorClassifierFinalIssueSystemInfra")
+ public void testDisabledErrorClassification()
+ throws IOException, ReflectiveOperationException,
TroubleshooterException {
+ DagManagementStateStore dagManagementStateStore =
mock(DagManagementStateStore.class);
+ Config config =
ConfigFactory.empty().withValue(ServiceConfigKeys.ERROR_CLASSIFICATION_ENABLED_KEY,
ConfigValueFactory.fromAnyRef(Boolean.FALSE));
+ List<ErrorPatternProfile> errorPatternProfiles =
IssueTestDataProvider.getSortedPatterns();
+
+ InMemoryErrorPatternStore store = new InMemoryErrorPatternStore(config);
+ store.upsertCategory(IssueTestDataProvider.TEST_CATEGORIES);
+ store.upsertPatterns(errorPatternProfiles);
+
store.setDefaultCategory(IssueTestDataProvider.TEST_DEFAULT_ERROR_CATEGORY);
+
+ this.errorClassifier = new ErrorClassifier(store, config);
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic2");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createJobFailedEvent()
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+
+ //Set issues for error classification
+
Mockito.doReturn(IssueTestDataProvider.testSystemInfraCategoryIssues()).when(eventHandler).getErrorListForClassification(Mockito.any());
+
+ try {
+ Thread.sleep(1000L);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
+ config, new NoopGaaSJobObservabilityEventProducer(),
dagManagementStateStore);
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+
+ ConsumerIterator<byte[], byte[]> iterator =
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
+ MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
+
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
+
+ // Verify that the JobIssueEventHandler was not called for error
classification
+ Mockito.verify(eventHandler, Mockito.times(0))
+ .logFinalError(Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.any(), Mockito.any());
+ Mockito.verify(eventHandler, Mockito.times(0))
+ .getErrorListForClassification(Mockito.any());
+
+ jobStatusMonitor.shutDown();
+ }
+
@Test (dependsOnMethods = "testProcessMessageForSuccessfulFlow")
public void testProcessMessageForFailedFlow() throws IOException,
ReflectiveOperationException {
DagManagementStateStore dagManagementStateStore =
mock(DagManagementStateStore.class);
@@ -309,7 +432,7 @@ public class KafkaAvroJobStatusMonitorTest {
createFlowCompiledEvent(),
createJobOrchestratedEvent(1, 2),
createJobSkippedEvent()
- ).forEach(event -> {
+ ).forEach(event -> {
context.submitEvent(event);
kafkaReporter.report();
});
@@ -950,14 +1073,15 @@ public class KafkaAvroJobStatusMonitorTest {
}
MockKafkaAvroJobStatusMonitor
createMockKafkaAvroJobStatusMonitor(AtomicBoolean
shouldThrowFakeExceptionInParseJobStatusToggle, Config additionalConfig,
- GaaSJobObservabilityEventProducer eventProducer, DagManagementStateStore
dagManagementStateStore) throws IOException, ReflectiveOperationException {
+ GaaSJobObservabilityEventProducer eventProducer, DagManagementStateStore
dagManagementStateStore)
+ throws IOException, ReflectiveOperationException {
Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS,
ConfigValueFactory.fromAnyRef("localhost:0000"))
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
.withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef(stateStoreDir))
.withValue("zookeeper.connect",
ConfigValueFactory.fromAnyRef("localhost:2121"))
.withFallback(additionalConfig);
return new MockKafkaAvroJobStatusMonitor("test", config, 1,
shouldThrowFakeExceptionInParseJobStatusToggle,
- eventProducer, dagManagementStateStore);
+ eventProducer, dagManagementStateStore, errorClassifier, eventHandler);
}
/**
* Create a dummy event to test if it is filtered out by the consumer.
@@ -1013,9 +1137,9 @@ public class KafkaAvroJobStatusMonitorTest {
*/
public MockKafkaAvroJobStatusMonitor(String topic, Config config, int
numThreads,
AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle,
GaaSJobObservabilityEventProducer producer,
- DagManagementStateStore dagManagementStateStore)
+ DagManagementStateStore dagManagementStateStore, ErrorClassifier
errorClassifier, JobIssueEventHandler eventHandler)
throws IOException, ReflectiveOperationException {
- super(topic, config, numThreads, mock(JobIssueEventHandler.class),
producer, dagManagementStateStore);
+ super(topic, config, numThreads, eventHandler, producer,
dagManagementStateStore, errorClassifier);
shouldThrowFakeExceptionInParseJobStatus =
shouldThrowFakeExceptionInParseJobStatusToggle;
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ErrorClassifier.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ErrorClassifier.java
new file mode 100644
index 0000000000..d0f5fde65c
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ErrorClassifier.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import java.io.IOException;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ErrorCategory;
+import org.apache.gobblin.configuration.ErrorPatternProfile;
+import org.apache.gobblin.metastore.ErrorPatternStore;
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Classifies issues by matching their summary description to error patterns
and categories.
+ * Categorisation is based on regex patterns and their associated categories.
+ * Each category has an associated priority value.
+ */
+@Slf4j
+public class ErrorClassifier {
+ private final List<CompiledErrorPattern> errorIssues;
+ private final Map<String, ErrorCategory> categoryMap;
+ private ErrorPatternStore errorStore = null;
+
+ private final int maxErrorsInFinalIssue;
+ private static final String FINAL_ISSUE_ERROR_CODE = "T0000";
+ private ErrorCategory defaultErrorCategory = null;
+
+ /**
+ * Loads all error issues and categories from the store into memory.
+ */
+ @Inject
+ public ErrorClassifier(ErrorPatternStore store, Config config)
+ throws IOException {
+ this.errorStore = store;
+
+ this.maxErrorsInFinalIssue =
+ ConfigUtils.getInt(config,
ServiceConfigKeys.ERROR_CLASSIFICATION_MAX_ERRORS_IN_FINAL_KEY,
+
ServiceConfigKeys.DEFAULT_ERROR_CLASSIFICATION_MAX_ERRORS_IN_FINAL);
+
+ this.categoryMap = new HashMap<>();
+ for (ErrorCategory cat : this.errorStore.getAllErrorCategories()) {
+ categoryMap.put(cat.getCategoryName(), cat);
+ }
+
+ this.errorIssues = new ArrayList<>();
+ for (ErrorPatternProfile errorPattern :
this.errorStore.getAllErrorPatternsOrderedByCategoryPriority()) {
+ errorIssues.add(new CompiledErrorPattern(errorPattern));
+ }
+
+ try {
+ this.defaultErrorCategory = this.errorStore.getDefaultCategory();
+ }
+ catch (IOException e) {
+ log.warn("Failed to load default error category from store, will not use
default category for classification", e);
+ this.defaultErrorCategory = null;
+ }
+ }
+
+
+ /**
+ * Classifies a list of issues and returns the highest priority category
with its matched issues.
+ * If no issues match, returns null.
+ * If defaultErrorCategory is set, it will be used for unmatched issues.
+ */
+ public Issue classifyEarlyStopWithDefault(List<Issue> issues) {
+ if (issues == null || issues.isEmpty()) {
+ return null;
+ }
+
+ ClassificationResult result = performClassification(issues);
+
+ if (result.highestCategoryName == null) {
+ return null;
+ }
+
+ return buildFinalIssue(result.highestCategoryName,
result.categoryToIssues);
+ }
+
+ private ClassificationResult performClassification(List<Issue> issues) {
+ ClassificationResult result = new ClassificationResult();
+
+ for (Issue issue : issues) {
+ classifySingleIssue(issue, result);
+ }
+
+ applyDefaultCategoryIfNeeded(result);
+ return result;
+ }
+
+ private void classifySingleIssue(Issue issue, ClassificationResult result) {
+ ErrorCategory matchedErrorCategory = findBestMatchingCategory(issue,
result.highestPriority);
+
+ if (matchedErrorCategory != null) {
+ addMatchedIssue(issue, matchedErrorCategory, result);
+ } else {
+ addUnmatchedIssue(issue, result);
+ }
+ }
+
+ private ErrorCategory findBestMatchingCategory(Issue issue, Integer
currentHighestPriority) {
+ for (CompiledErrorPattern pei : errorIssues) {
+ ErrorCategory cat = categoryMap.get(pei.getCategoryName());
+ if (cat == null) {
+ continue;
+ }
+
+ // Early stop optimization - skip categories with lower priority
+ if (currentHighestPriority != null && cat.getPriority() >
currentHighestPriority) {
+ break;
+ }
+
+ if (pei.matches(issue.getSummary())) {
+ return cat;
+ }
+ }
+ return null;
+ }
+
+ private void addMatchedIssue(Issue issue, ErrorCategory errorCategory,
ClassificationResult result) {
+ result.categoryToIssues.computeIfAbsent(errorCategory.getCategoryName(), k
-> new ArrayList<>()).add(issue);
+
+ updateHighestPriorityIfNeeded(errorCategory, result);
+ }
+
+ private void addUnmatchedIssue(Issue issue, ClassificationResult result) {
+ result.unmatched.add(issue);
+
+ // Initialize default priority only once when we encounter the first
unmatched issue
+ if (result.defaultPriority == null && defaultErrorCategory != null) {
+ result.defaultPriority = defaultErrorCategory.getPriority();
+ // Only update highest priority if no category has been matched yet OR
if default category has higher priority (lower number)
+ if (result.highestPriority == null || defaultErrorCategory.getPriority()
< result.highestPriority) {
+ result.highestPriority = result.defaultPriority;
+ result.highestCategoryName = defaultErrorCategory.getCategoryName();
+ }
+ }
+ }
+
+ private void updateHighestPriorityIfNeeded(ErrorCategory errorCategory,
ClassificationResult result) {
+ if (result.highestPriority == null || errorCategory.getPriority() <
result.highestPriority) {
+ result.highestPriority = errorCategory.getPriority();
+ result.highestCategoryName = errorCategory.getCategoryName();
+ }
+ }
+
+ private void applyDefaultCategoryIfNeeded(ClassificationResult result) {
+ boolean shouldUseDefault = result.highestPriority != null &&
result.highestPriority.equals(result.defaultPriority)
+ && !result.unmatched.isEmpty() && defaultErrorCategory != null;
+
+ if (shouldUseDefault) {
+ result.highestCategoryName = defaultErrorCategory.getCategoryName();
+
+ for (Issue issue : result.unmatched) {
+
result.categoryToIssues.computeIfAbsent(defaultErrorCategory.getCategoryName(),
k -> new ArrayList<>())
+ .add(issue);
+ }
+ }
+ }
+
+ private Issue buildFinalIssue(String categoryName, Map<String, List<Issue>>
categoryToIssues) {
+ List<Issue> matchedIssues = categoryToIssues.get(categoryName);
+ String details = buildDetailsString(matchedIssues);
+
+ return Issue.builder().summary("ErrorCategory: " +
categoryName).details(details).severity(IssueSeverity.ERROR)
+
.time(ZonedDateTime.now()).code(FINAL_ISSUE_ERROR_CODE).sourceClass(null).exceptionClass(null).properties(null)
+ .build();
+ }
+
+ private String buildDetailsString(List<Issue> issues) {
+ List<String> summaries = new ArrayList<>();
+ int limit = Math.min(maxErrorsInFinalIssue, issues.size());
+
+ for (int i = 0; i < limit; i++) {
+ summaries.add(issues.get(i).getSummary());
+ }
+
+ return String.join(" \n ", summaries);
+ }
+
+ /**
+ * Helper class that stores the result of issue classification, including
matched categories, unmatched issues, and priority information.
+ */
+ private static class ClassificationResult {
+ Map<String, List<Issue>> categoryToIssues = new HashMap<>();
+ List<Issue> unmatched = new ArrayList<>();
+ Integer highestPriority = null;
+ String highestCategoryName = null;
+ Integer defaultPriority = null;
+ }
+
+ /**
+ * Helper class to store compiled regex for fast matching.
+ */
+ private static class CompiledErrorPattern {
+ private final ErrorPatternProfile issue;
+ private final Pattern pattern;
+
+ CompiledErrorPattern(ErrorPatternProfile issue) {
+ this.issue = issue;
+ try {
+ this.pattern = Pattern.compile(issue.getDescriptionRegex(),
Pattern.CASE_INSENSITIVE);
+ } catch (PatternSyntaxException e) {
+ log.error("Invalid regex pattern for issue: {}. Error: {}",
issue.getDescriptionRegex(), e.getMessage());
+ throw new IllegalArgumentException("Invalid regex pattern: " +
issue.getDescriptionRegex(), e);
+ }
+ }
+
+ boolean matches(String summary) {
+ if (summary == null || summary.isEmpty()) {
+ return false;
+ }
+ return pattern.matcher(summary).find();
+ }
+
+ String getCategoryName() {
+ return issue.getCategoryName();
+ }
+ }
+}
+
+
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryIssueRepository.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryIssueRepository.java
index e9fdebf6a0..1ae6f6f291 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryIssueRepository.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryIssueRepository.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -64,6 +65,14 @@ public class InMemoryIssueRepository implements
IssueRepository {
return new ArrayList<>(issues.values());
}
+ @Override
+ public synchronized List<Issue> getMostRecentErrors(int limit)
+ throws TroubleshooterException {
+
+ return issues.values().stream().filter(issue -> issue.getSeverity() ==
IssueSeverity.ERROR)
+ .sorted((a, b) ->
b.getTime().compareTo(a.getTime())).limit(limit).collect(Collectors.toList());
+ }
+
@Override
public synchronized void put(Issue issue)
throws TroubleshooterException {
@@ -72,7 +81,7 @@ public class InMemoryIssueRepository implements
IssueRepository {
if (!reportedOverflow) {
reportedOverflow = true;
log.warn("In-memory issue repository has {} elements and is now full.
New issues will be ignored.",
- issues.size());
+ issues.size());
}
return;
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java
index 530798d873..138f98ed42 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/InMemoryMultiContextIssueRepository.java
@@ -70,11 +70,23 @@ public class InMemoryMultiContextIssueRepository extends
AbstractIdleService imp
return Collections.emptyList();
}
+ @Override
+ public synchronized List<Issue> getMostRecentErrors(String contextId, int
limit)
+ throws TroubleshooterException {
+
+ InMemoryIssueRepository issueRepository =
contextIssues.getOrDefault(contextId, null);
+
+ if (issueRepository != null) {
+ return issueRepository.getMostRecentErrors(limit);
+ }
+ return Collections.emptyList();
+ }
+
@Override
public synchronized void put(String contextId, Issue issue)
throws TroubleshooterException {
- InMemoryIssueRepository issueRepository = contextIssues
- .computeIfAbsent(contextId, s -> new
InMemoryIssueRepository(configuration.getMaxIssuesPerContext()));
+ InMemoryIssueRepository issueRepository =
contextIssues.computeIfAbsent(contextId,
+ s -> new
InMemoryIssueRepository(configuration.getMaxIssuesPerContext()));
issueRepository.put(issue);
}
@@ -83,8 +95,8 @@ public class InMemoryMultiContextIssueRepository extends
AbstractIdleService imp
public synchronized void put(String contextId, List<Issue> issues)
throws TroubleshooterException {
- InMemoryIssueRepository issueRepository = contextIssues
- .computeIfAbsent(contextId, s -> new
InMemoryIssueRepository(configuration.getMaxIssuesPerContext()));
+ InMemoryIssueRepository issueRepository =
contextIssues.computeIfAbsent(contextId,
+ s -> new
InMemoryIssueRepository(configuration.getMaxIssuesPerContext()));
issueRepository.put(issues);
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/IssueRepository.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/IssueRepository.java
index 61f19eeba8..5741b9d662 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/IssueRepository.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/IssueRepository.java
@@ -35,6 +35,9 @@ public interface IssueRepository {
List<Issue> getAll()
throws TroubleshooterException;
+ List<Issue> getMostRecentErrors(int limit)
+ throws TroubleshooterException;
+
/**
* Saves an issue to the repository, if it is not yet present.
*
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/IssueTestDataProvider.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/IssueTestDataProvider.java
new file mode 100644
index 0000000000..8894619d5b
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/IssueTestDataProvider.java
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ErrorCategory;
+import org.apache.gobblin.configuration.ErrorPatternProfile;
+
+import static java.util.Collections.singletonMap;
+
+@Slf4j
+public class IssueTestDataProvider {
+
+ // Test categories
+ public static final List<ErrorCategory> TEST_CATEGORIES = Arrays.asList(
+ new ErrorCategory("USER", 1),
+ new ErrorCategory("SYSTEM INFRA", 2),
+ new ErrorCategory("GAAS", 3),
+ new ErrorCategory("MISC", 4),
+ new ErrorCategory("UNKNOWN", 5),
+ new ErrorCategory("NON FATAL", 6)
+ );
+
+ public static final ErrorCategory TEST_DEFAULT_ERROR_CATEGORY = new
ErrorCategory("UNKNOWN", 5);
+
+ // Test patterns - you'll need to add all the patterns here
+ public static final List<ErrorPatternProfile> TEST_PATTERNS = Arrays.asList(
+ new ErrorPatternProfile(".*the namespace quota.*", "USER"),
+ new ErrorPatternProfile(".*permission denied.*", "USER"),
+ new ErrorPatternProfile(".*remoteexception.*read only mount point.*",
"USER"),
+ new ErrorPatternProfile(".*remoteexception: operation category write
is not supported in state observer.*", "USER"),
+ new ErrorPatternProfile(".*invalidoperationexception: alter is not
possible.*", "USER"),
+ new ErrorPatternProfile(".*the diskspace quota.*", "USER"),
+ new ErrorPatternProfile(".*filenotfoundexception:.*", "USER"),
+ new ErrorPatternProfile(".*does not exist.*", "USER"),
+ new ErrorPatternProfile("remoteexception: file/directory.*does not
exist", "USER"),
+ new ErrorPatternProfile("runtimeexception: directory not found.*",
"USER"),
+ new ErrorPatternProfile(".*icebergtable.tablenotfoundexception.*",
"USER"),
+ new ErrorPatternProfile("ioexception: topath.*must be an ancestor of
frompath.*", "USER"),
+ new ErrorPatternProfile("ioexception: origin path.*does not exist.*",
"USER"),
+ new ErrorPatternProfile("illegalargumentexception: missing source
iceberg table.*", "USER"),
+ new ErrorPatternProfile("remoteexception:.*already exists \\| failed
to send re-scaling directive.*", "USER"),
+ new ErrorPatternProfile("illegalargumentexception: when replacing
prefix, all locations must be descendants of the prefix.*", "GAAS"),
+ new ErrorPatternProfile("illegalargumentexception:.*", "MISC"),
+ new ErrorPatternProfile(".*remoteexception: server too busy.*",
"SYSTEM INFRA"),
+ new ErrorPatternProfile("jschexception: session is down.*", "SYSTEM
INFRA"),
+ new ErrorPatternProfile("jschexception: channel request.*", "SYSTEM
INFRA"),
+ new ErrorPatternProfile(".*safemodeexception.*", "SYSTEM INFRA"),
+ new ErrorPatternProfile("remoteexception: no namenode available to
invoke.*", "SYSTEM INFRA"),
+ new ErrorPatternProfile("remoteexception:
org.apache.hadoop.hdfs.server.federation.router.nonamenodesavailableexception:
no namenodes available under nameservice.*", "SYSTEM INFRA"),
+ new ErrorPatternProfile(".*no namenodes available under
nameservice.*", "SYSTEM INFRA"),
+ new ErrorPatternProfile(".*sockettimeoutexception.*", "SYSTEM INFRA"),
+ new ErrorPatternProfile(".*socketexception: connection reset.*",
"SYSTEM INFRA"),
+ new ErrorPatternProfile("timeoutexception: failed to update metadata
after 60000 ms.*", "SYSTEM INFRA"),
+ new ErrorPatternProfile("networkexception: the server disconnected
before a response was received.*", "SYSTEM INFRA"),
+ new ErrorPatternProfile("webclientrequestwithmessageexception:
handshake timed out after 10000ms.*", "SYSTEM INFRA"),
+ new ErrorPatternProfile("timeoutfailure: message='activity heartbeat
timeout.*", "GAAS"),
+ new ErrorPatternProfile("timeoutexception: failed to allocate memory
within the configured max blocking time.*", "GAAS"),
+ new ErrorPatternProfile("timeoutfailure: message='activity
starttoclose timeout'.*", "GAAS"),
+ new ErrorPatternProfile("timeoutexception: topic
gobblintrackingevent_sgs not present in metadata after 60000 ms.*", "GAAS"),
+ new ErrorPatternProfile("connectexception: connection refused.*",
"SYSTEM INFRA"),
+ new ErrorPatternProfile("connectionisclosedexception: no operations
allowed after connection closed.*", "SYSTEM INFRA"),
+ new ErrorPatternProfile("ioexception: failed to get connection for.*is
already stopped.*", "SYSTEM INFRA"),
+ new ErrorPatternProfile("ioexception: unable to close file because
dfsclient was unable to contact the hdfs servers.*", "SYSTEM INFRA"),
+ new ErrorPatternProfile(".*hikaripool-.*-datasourceprovider - failed
to execute connection test query.*", "SYSTEM INFRA"),
+ new ErrorPatternProfile(".*interruptedioexception.*", "MISC"),
+ new ErrorPatternProfile("interruptedexception: sleep interrupted.*",
"MISC"),
+ new ErrorPatternProfile("closedbyinterruptexception.*", "MISC"),
+ new ErrorPatternProfile("interruptedexception: \\| failed to commit
writer for partition.*", "MISC"),
+ new ErrorPatternProfile(".*http connection failed for getting token
from azuread.*", "USER"),
+ new ErrorPatternProfile("ioexception: invalid token.*", "USER"),
+ new ErrorPatternProfile(".*remoteexception: token.*can't be found in
cache.*", "SYSTEM INFRA"),
+ new ErrorPatternProfile(".*applicationfailure:.*token.*can't be found
in cache.*", "SYSTEM INFRA"),
+ new ErrorPatternProfile(".*closedchannelexception.*", "MISC"),
+ new ErrorPatternProfile("applicationfailure: message='task failed:
org.apache.gobblin.runtime.forkexception.*", "MISC"),
+ new ErrorPatternProfile(".*eofexception.*", "MISC"),
+ new ErrorPatternProfile("canceledfailure: child canceled.*", "MISC"),
+ new ErrorPatternProfile(".*message='unable to create new native
thread'.*", "GAAS"),
+ new ErrorPatternProfile("ioexception: failed to register hive spec
simplehivespec.*", "USER"),
+ new ErrorPatternProfile("ioexception: error: likely concurrent writing
to destination: \\(just prior\\) tablemetadata.*", "MISC"),
+ new ErrorPatternProfile(".*nullpointerexception:.*", "MISC"),
+ new ErrorPatternProfile(".*classnotfoundexception.*", "MISC"),
+ new ErrorPatternProfile(".*noclassdeffounderror: could not initialize
class.*", "MISC"),
+ new ErrorPatternProfile(".*failure in getting work units for job.*",
"MISC"),
+ new ErrorPatternProfile(".*operation failed: \"the uploaded data is
not contiguous or the position query parameter value is not equal to the length
of the file after appending the uploaded data.*", "MISC"),
+ new ErrorPatternProfile(".*applicationfailure: message='task failed:
operation failed: \"the uploaded data is not contiguous or the position query
parameter value is not equal to the length of the file after appending the
uploaded data.*", "MISC"),
+ new ErrorPatternProfile(".*could not create work unit to deregister
partition.*", "MISC"),
+ new ErrorPatternProfile("runtimeexception: not committing dataset.*",
"MISC"),
+ new ErrorPatternProfile("job.*has unfinished commit sequences. will
not clean up staging data", "MISC"),
+ new ErrorPatternProfile(".*failed to commit.*dataset.*", "MISC"),
+ new ErrorPatternProfile(".*could not commit file.*", "MISC"),
+ new ErrorPatternProfile("runtimeexception: failed to parse the
date.*", "USER"),
+ new ErrorPatternProfile("runtimeexception: clean failed for one or
more datasets.*", "MISC"),
+ new ErrorPatternProfile("runtimeexception: not copying.*", "MISC"),
+ new ErrorPatternProfile(".*failed to read from all available
datanodes.*", "NON FATAL"),
+ new ErrorPatternProfile(".*failed to prepare extractor: error - failed
to get schema for this object.*", "NON FATAL"),
+ new ErrorPatternProfile(".*failed to get primary group for kafkaetl,
using user name as primary group name.*", "NON FATAL"),
+ new ErrorPatternProfile(".*failed to delete.*", "NON FATAL"),
+ new ErrorPatternProfile(".*applicationfailure: message='task failed:
java.lang.arrayindexoutofboundsexception.*", "NON FATAL"),
+ new ErrorPatternProfile(".*arrayindexoutofboundsexception.*failed to
close all open resources", "NON FATAL"),
+ new ErrorPatternProfile("ioexception: failed to clean up all writers.
\\| failed to close all open resources.*", "MISC"),
+ new ErrorPatternProfile("applicationfailure:.*failed to clean up all
writers.*", "MISC"),
+ new ErrorPatternProfile("applicationfailure: message='task failed:
java.io.ioexception: failed to commit all writers..*", "MISC"),
+ new ErrorPatternProfile(".*metaexception.*", "USER"),
+ new ErrorPatternProfile(".*ioexception: filesystem closed.*", "GAAS"),
+ new ErrorPatternProfile(".*source and target table are not
compatible.*", "USER"),
+ new ErrorPatternProfile("hivetablelocationnotmatchexception: desired
target location.*do not agree.*", "USER"),
+ new ErrorPatternProfile(".*schema mismatch between metadata.*",
"USER"),
+ new ErrorPatternProfile(".*could not read all task state files.*",
"MISC"),
+ new ErrorPatternProfile("taskstatestore successfully opened, but no
task states found under.*", "MISC"),
+ new ErrorPatternProfile(".*setting task state to failed.*", "MISC"),
+ new ErrorPatternProfile("illegalstateexception: expected end_array.*",
"GAAS"),
+ new ErrorPatternProfile("illegalstateexception: cannot perform
operation after producer has been closed.*", "GAAS"),
+ new ErrorPatternProfile("webclientresponse.*503 service
unavailable.*", "SYSTEM INFRA"),
+ new ErrorPatternProfile("webclientresponseexception.*504.*", "SYSTEM
INFRA"),
+ new
ErrorPatternProfile("webclientresponsewithmessageexception.*504.*", "SYSTEM
INFRA"),
+ new ErrorPatternProfile("webclientresponseexception.*400.*", "USER"),
+ new ErrorPatternProfile("webclientresponseexception.*409.*", "USER"),
+ new
ErrorPatternProfile("webclientresponsewithmessageexception.*402.*", "USER"),
+ new ErrorPatternProfile(".*fsstatestore.*", "MISC"),
+ new ErrorPatternProfile("sqltransientconnectionexception:.*connection
is not available, request timed out after.*", "SYSTEM INFRA"),
+ new ErrorPatternProfile("sqlnontransientconnectionexception: got
timeout reading communication packets.*", "SYSTEM INFRA"),
+ new ErrorPatternProfile("sqlexception: unsupported transaction
isolation level '-1'.*", "SYSTEM INFRA"),
+ new ErrorPatternProfile("sqlexception: incorrect string
value:.*for.*", "USER"),
+ new ErrorPatternProfile(".*obstateutils.opentaskstatestoreuncached
debug.*", "MISC"),
+ new ErrorPatternProfile("[bug] workflow thread.*can't be destroyed in
time. this will lead to a workflow cache leak. this problem is usually caused
by a workflow implementation swallowing java.lang.error instead of rethrowing
it.*", "SYSTEM INFRA"),
+ new ErrorPatternProfile(".*outofmemoryerror.*", "GAAS"),
+ new ErrorPatternProfile("filealreadyexistsexception: rename
destination.*", "NON FATAL"),
+ new ErrorPatternProfile("applicationfailure: message='failed to
rename.*", "MISC"),
+ new ErrorPatternProfile("potentialdeadlockexception: potential
deadlock detected.*", "GAAS"),
+ new ErrorPatternProfile("workflow lock for the run id hasn't been
released by one of previous execution attempts, consider increasing workflow
task timeout.*", "GAAS"),
+ new ErrorPatternProfile("nosuchelementexception: \\| failed to commit
writer for partition.*", "MISC"),
+ new ErrorPatternProfile("illegalargumentexception: cannot instantiate
jdbcpublisher since it does not extend singletaskdatapublisher.*", "MISC"),
+ new ErrorPatternProfile("processing
failure:.*retrystate=retry_state.*commit workflow failure", "MISC"),
+ new ErrorPatternProfile(".*recordtoolargeexception.*", "MISC"),
+ new ErrorPatternProfile("ioexception: could not get block locations.
source file", "MISC"),
+ new ErrorPatternProfile("ioexception: job status not available \\|
failed to launch and run job.*", "MISC"),
+ new ErrorPatternProfile("enqueuing of event.*timed out. sending of
events is probably stuck", "MISC"),
+ new ErrorPatternProfile("applicationfailure: message='task failed:
status code: -1 error code.*", "MISC"),
+ new ErrorPatternProfile("applicationfailure: message='failing in
submitting at least one task before execution.'.*", "MISC"),
+ new ErrorPatternProfile(".*watermark type simple not recognized.*",
"MISC")
+ );
+
+ public static List<ErrorPatternProfile> getSortedPatterns() {
+ Map<String, Integer> categoryPriority = new HashMap<>();
+ for (ErrorCategory errorCategory : TEST_CATEGORIES) {
+ categoryPriority.put(errorCategory.getCategoryName(),
errorCategory.getPriority());
+ }
+
+ List<ErrorPatternProfile> sortedPatterns = new ArrayList<>(TEST_PATTERNS);
+ sortedPatterns.sort(Comparator.comparingInt(e ->
categoryPriority.getOrDefault(e.getCategoryName(), Integer.MAX_VALUE)));
+ return sortedPatterns;
+ }
+
+ public static List<Issue> testUserCategoryIssues() {
+ log.info("Generating test data for USER category issues");
+ List<Issue> issues = new ArrayList<>();
+
+ // Namespace Quota issues
+ issues.add(new Issue(ZonedDateTime.now(), IssueSeverity.ERROR, "ABCD123",
+ "Job failed due to the namespace quota being exceeded for user data",
"ContextA", "namespaceQuotaError",
+ "quotaexception", singletonMap("keyA", "valueA")));
+
+ // Permission Issues
+ issues.add(new Issue(ZonedDateTime.now().minusDays(1),
IssueSeverity.ERROR, "ABCD123",
+ "Task failed with permission denied when accessing secure directory",
"ContextB", "permissionError",
+ "securityexception", singletonMap("keyB", "valueB")));
+ issues.add(new Issue(ZonedDateTime.now().minusHours(2),
IssueSeverity.ERROR, "ABCD123",
+ "RemoteException: operation category write is not supported in state
observer mode", "ContextC",
+ "writePermissionError", "remoteexception", singletonMap("keyC",
"valueC")));
+
+ // Diskspace Quota Issues
+ issues.add(new Issue(ZonedDateTime.now().minusHours(3),
IssueSeverity.ERROR, "ABCD123",
+ "Operation failed because the diskspace quota has been reached",
"ContextD", "diskQuotaError", "quotaexception",
+ singletonMap("keyD", "valueD")));
+
+ // File Not Found Issues
+ issues.add(new Issue(ZonedDateTime.now().minusHours(4),
IssueSeverity.ERROR, "ABCD123",
+ "FileNotFoundException: /path/to/missing/file.txt not found",
"ContextE", "fileNotFoundError",
+ "filenotfoundexception", singletonMap("keyE", "valueE")));
+ issues.add(new Issue(ZonedDateTime.now().minusHours(5),
IssueSeverity.ERROR, "ABCD123",
+ "RemoteException: file/directory /data/temp does not exist",
"ContextF", "directoryNotFoundError",
+ "remoteexception", singletonMap("keyF", "valueF")));
+
+ // Token Issues
+ issues.add(new Issue(ZonedDateTime.now().minusHours(6),
IssueSeverity.ERROR, "ABCD123",
+ "HTTP connection failed for getting token from AzureAD authentication
service", "ContextG", "tokenError",
+ "authexception", singletonMap("keyG", "valueG")));
+
+ return issues;
+ }
+
+ public static List<Issue> testSystemInfraCategoryIssues() {
+ // Returns a list of Issues that should match SYSTEM INFRA category regex
patterns
+ log.info("Generating test data for SYSTEM_INFRA category issues");
+ List<Issue> issues = new ArrayList<>();
+
+ // Server Issues
+ issues.add(new Issue(ZonedDateTime.now(), IssueSeverity.ERROR,
"SYSTEM_INFRA",
+ "RemoteException: server too busy to handle request at this time",
"ContextA", "serverBusyError",
+ "remoteexception", singletonMap("keyA", "valueA")));
+ issues.add(new Issue(ZonedDateTime.now().minusDays(1),
IssueSeverity.ERROR, "SYSTEM_INFRA",
+ "JSchException: session is down and cannot be restored", "ContextB",
"sessionDownError", "jschexception",
+ singletonMap("keyB", "valueB")));
+
+ // Namenode Issues
+ issues.add(new Issue(ZonedDateTime.now().minusHours(2),
IssueSeverity.ERROR, "SYSTEM_INFRA",
+ "SafeModeException: Name node is in safe mode and cannot accept
changes", "ContextC", "safeModeError",
+ "safemodeexception", singletonMap("keyC", "valueC")));
+ issues.add(new Issue(ZonedDateTime.now().minusHours(3),
IssueSeverity.ERROR, "SYSTEM_INFRA",
+ "RemoteException: no namenode available to invoke for cluster
operations", "ContextD", "namenodeError",
+ "remoteexception", singletonMap("keyD", "valueD")));
+
+ // Timeout Issues
+ issues.add(new Issue(ZonedDateTime.now().minusHours(4),
IssueSeverity.ERROR, "SYSTEM_INFRA",
+ "SocketTimeoutException: Read timeout after 30000ms waiting for
response", "ContextE", "timeoutError",
+ "sockettimeoutexception", singletonMap("keyE", "valueE")));
+ issues.add(new Issue(ZonedDateTime.now().minusHours(5),
IssueSeverity.ERROR, "SYSTEM_INFRA",
+ "SocketException: connection reset by peer during data transfer",
"ContextF", "connectionResetError",
+ "socketexception", singletonMap("keyF", "valueF")));
+
+ // Connection Issues
+ issues.add(new Issue(ZonedDateTime.now().minusHours(6),
IssueSeverity.ERROR, "SYSTEM_INFRA",
+ "ConnectException: connection refused when trying to connect to remote
service", "ContextG",
+ "connectionRefusedError", "connectexception", singletonMap("keyG",
"valueG")));
+
+ return issues;
+ }
+
+ public static List<Issue> testGaasCategoryIssues() {
+ // Returns a list of Issues that should match GAAS category regex patterns
+ log.info("Generating test data for GAAS category issues");
+ List<Issue> issues = new ArrayList<>();
+
+ String testString = "TimeoutFailure: message='activity heartbeat timeout
exceeded configured limit'"; // TBD: DELETE
+ // Timeout Issues
+ issues.add(
+ new Issue(ZonedDateTime.now(), IssueSeverity.ERROR, "GAAS",
testString, "ContextA", "activityTimeoutError",
+ "timeoutfailure", singletonMap("keyA", "valueA")));
+
+ issues.add(new Issue(ZonedDateTime.now().minusDays(1),
IssueSeverity.ERROR, "GAAS",
+ "TimeoutException: failed to allocate memory within the configured max
blocking time of 5000ms", "ContextB",
+ "memoryTimeoutError", "timeoutexception", singletonMap("keyB",
"valueB")));
+
+ // Thread Issues
+ issues.add(new Issue(ZonedDateTime.now().minusHours(2),
IssueSeverity.ERROR, "GAAS",
+ "ApplicationFailure: message='unable to create new native thread -
system limits reached'", "ContextC",
+ "threadCreationError", "applicationfailure", singletonMap("keyC",
"valueC")));
+
+ // Filesystem Issues
+ issues.add(new Issue(ZonedDateTime.now().minusHours(3),
IssueSeverity.ERROR, "GAAS",
+ "IOException: filesystem closed unexpectedly during operation",
"ContextD", "filesystemClosedError",
+ "ioexception", singletonMap("keyD", "valueD")));
+
+ // Illegal State Issues
+ issues.add(new Issue(ZonedDateTime.now().minusHours(4),
IssueSeverity.ERROR, "GAAS",
+ "IllegalStateException: expected END_ARRAY but found different token
type", "ContextE", "illegalStateError",
+ "illegalstateexception", singletonMap("keyE", "valueE")));
+
+ return issues;
+ }
+
+ public static List<Issue> testMiscCategoryIssues() {
+ // Returns a list of Issues that should match MISC category regex patterns
+ log.info("Generating test data for MISC category issues");
+ List<Issue> issues = new ArrayList<>();
+
+ // Interruption Issues
+ issues.add(new Issue(ZonedDateTime.now(), IssueSeverity.WARN, "MISC",
+ "InterruptedIOException occurred during file operation", "ContextA",
"interruptedIOError",
+ "interruptedioexception", singletonMap("keyA", "valueA")));
+ issues.add(new Issue(ZonedDateTime.now().minusDays(1), IssueSeverity.WARN,
"MISC",
+ "InterruptedException: sleep interrupted by external signal",
"ContextB", "sleepInterruptedError",
+ "interruptedexception", singletonMap("keyB", "valueB")));
+
+ // Null Pointer Issues
+ issues.add(new Issue(ZonedDateTime.now().minusHours(2),
IssueSeverity.ERROR, "MISC",
+ "NullPointerException: attempt to invoke method on null object
reference", "ContextC", "nullPointerError",
+ "nullpointerexception", singletonMap("keyC", "valueC")));
+
+ // Class Not Found Issues
+ issues.add(new Issue(ZonedDateTime.now().minusHours(3),
IssueSeverity.ERROR, "MISC",
+ "ClassNotFoundException: com.example.MissingClass not found in
classpath", "ContextD", "classNotFoundError",
+ "classnotfoundexception", singletonMap("keyD", "valueD")));
+
+ // Concurrent Write Issues
+ issues.add(new Issue(ZonedDateTime.now().minusHours(4),
IssueSeverity.ERROR, "MISC",
+ "IOException: error: likely concurrent writing to destination: (just
prior) TableMetadata collision detected",
+ "ContextE", "concurrentWriteError", "ioexception",
singletonMap("keyE", "valueE")));
+
+ return issues;
+ }
+
+ public static List<Issue> testNonFatalCategoryIssues() {
+ log.info("Generating test data for NON FATAL category issues");
+ // Returns a list of Issues that should match NON FATAL category regex
patterns
+ List<Issue> issues = new ArrayList<>();
+
+
+ issues.add(new Issue(ZonedDateTime.now(), IssueSeverity.ERROR, "NON FATAL",
+ "filealreadyexistsexception: rename destination ", "ContextA",
"copyDataWarning",
+ "copydatapublisher", singletonMap("keyA", "valueA")));
+
+ issues.add(new Issue(ZonedDateTime.now().minusDays(1),
IssueSeverity.ERROR, "NON FATAL",
+ "Warning: failed to read from all available datanodes, retrying with
backup nodes", "ContextB",
+ "datanodeReadWarning", "ioexception", singletonMap("keyB", "valueB")));
+
+ issues.add(new Issue(ZonedDateTime.now().minusHours(2),
IssueSeverity.ERROR, "NON FATAL",
+ "Info: failed to delete temporary file - cleanup will retry later",
"ContextC", "deleteFailureInfo",
+ "ioexception", singletonMap("keyC", "valueC")));
+
+ issues.add(new Issue(ZonedDateTime.now().minusHours(3),
IssueSeverity.ERROR, "NON FATAL",
+ "1234: arrayindexoutofboundsexception. Operation failed to close all
open resources", "ContextD",
+ "reporterException", "reporterexception", singletonMap("keyD",
"valueD")));
+
+ return issues;
+ }
+
+ public static List<Issue> testNegativeMatchCases() {
+ log.info("Generating test data for negative match cases");
+ // Returns a list of Issues that should NOT match any specific regex
patterns
+ List<Issue> issues = new ArrayList<>();
+
+ // Generic success messages
+ issues.add(
+ new Issue(ZonedDateTime.now(), IssueSeverity.INFO, "SUCCESS", "Job
completed successfully without any errors",
+ "ContextA", "successInfo", "success", singletonMap("keyA",
"valueA")));
+
+ // Custom application errors not in regex patterns
+ issues.add(new Issue(ZonedDateTime.now().minusDays(1),
IssueSeverity.ERROR, "CUSTOM",
+ "CustomBusinessLogicException: validation failed for user input",
"ContextB", "customError",
+ "businessexception", singletonMap("keyB", "valueB")));
+
+ // Generic warnings that don't match patterns
+ issues.add(new Issue(ZonedDateTime.now().minusHours(2),
IssueSeverity.WARN, "GENERIC",
+ "Process completed with warnings but no critical errors detected",
"ContextC", "genericWarning", "warning",
+ singletonMap("keyC", "valueC")));
+
+ // Unrelated error messages
+ issues.add(new Issue(ZonedDateTime.now().minusHours(3),
IssueSeverity.ERROR, "UNRELATED",
+ "Database connection pool exhausted - increase pool size
configuration", "ContextD", "poolError",
+ "poolexception", singletonMap("keyD", "valueD")));
+
+ return issues;
+ }
+
+ public static List<Issue> testEdgeCasesAndBoundaryConditions() {
+ log.info("Generating test data for edge cases and boundary conditions");
+ // Returns a list of Issues to test edge cases and boundary conditions
+ List<Issue> issues = new ArrayList<>();
+
+ // Case sensitivity tests
+ issues.add(new Issue(ZonedDateTime.now(), IssueSeverity.ERROR, "CASE_TEST",
+ "FILENOTFOUNDEXCEPTION: uppercase version of file not found error",
"ContextA", "uppercaseError",
+ "filenotfoundexception", singletonMap("keyA", "valueA")));
+
+ // Partial pattern matches
+ issues.add(new Issue(ZonedDateTime.now().minusDays(1),
IssueSeverity.ERROR, "PARTIAL_TEST",
+ "This is a permission error but not exactly permission denied",
"ContextB", "partialPermissionError",
+ "securityexception", singletonMap("keyB", "valueB")));
+
+ // Multiple pattern matches in single description
+ issues.add(new Issue(ZonedDateTime.now().minusHours(2),
IssueSeverity.ERROR, "MULTI_MATCH",
+ "FileNotFoundException: permission denied when accessing the namespace
quota restricted file", "ContextC",
+ "multiMatchError", "complexexception", singletonMap("keyC",
"valueC")));
+
+ // Empty and null-like descriptions
+ issues.add(new Issue(ZonedDateTime.now().minusHours(3),
IssueSeverity.ERROR, "EMPTY_TEST", "", "ContextD",
+ "emptyDescriptionError", "unknownexception", singletonMap("keyD",
"valueD")));
+ issues.add(new Issue(ZonedDateTime.now().minusHours(4),
IssueSeverity.ERROR, "NULL_TEST", null, "ContextE",
+ "nullDescriptionError", "unknownexception", singletonMap("keyE",
"valueE")));
+
+ // Very long descriptions
+ issues.add(new Issue(ZonedDateTime.now().minusHours(5),
IssueSeverity.ERROR, "LONG_TEST",
+ "This is a very long error description that contains the phrase
'permission denied' somewhere in the middle of a lot of other text that might
make pattern matching more challenging to ensure our regex patterns work
correctly even with verbose error messages",
+ "ContextF", "longDescriptionError", "verboseexception",
singletonMap("keyF", "valueF")));
+
+ return issues;
+ }
+
+ public static List<Issue> testNonFatalAndUnknownMix() {
+ // Returns a list of Issues that should match NON FATAL and UNKNOWN
categories
+ List<Issue> issues = new ArrayList<>();
+
+ // NON FATAL issues
+ issues.add(new Issue(ZonedDateTime.now(), IssueSeverity.ERROR, "NON FATAL",
+ "applicationfailure: message='task failed:
java.lang.arrayindexoutofboundsexception"
+ + "ues", "ContextA", "copyDataWarning",
+ "copydatapublisher", singletonMap("keyA", "valueA")));
+ issues.add(new Issue(ZonedDateTime.now().minusHours(1),
IssueSeverity.ERROR, "NON FATAL",
+ "Info: failed to delete temporary file - cleanup will retry later",
"ContextB", "deleteFailureInfo",
+ "ioexception", singletonMap("keyB", "valueB")));
+
+ // UNKNOWN issues (should not match any known category)
+ issues.add(new Issue(ZonedDateTime.now().minusHours(2),
IssueSeverity.ERROR, "UNKNOWN",
+ "CustomApplicationException: unexpected error occurred in module",
"ContextC", "customError", "customexception",
+ singletonMap("keyC", "valueC")));
+ issues.add(new Issue(ZonedDateTime.now().minusHours(3),
IssueSeverity.WARN, "UNKNOWN",
+ "Process completed with warnings but no critical errors detected",
"ContextD", "genericWarning", "warning",
+ singletonMap("keyD", "valueD")));
+
+ return issues;
+ }
+
+ public static List<Issue> testMaximumErrorCountExceeded() {
+ // Returns a list of Issues where the number of errors exceeds the maximum
allowed count
+ // This should trigger overflow/limit logic to handle excessive error
volumes
+ List<Issue> issues = new ArrayList<>();
+
+ // Add 15 USER errors (highest priority)
+ for (int i = 0; i < 15; i++) {
+ issues.add(new Issue(ZonedDateTime.now().minusMinutes(i),
IssueSeverity.ERROR, "USER",
+ "FileNotFoundException: /path/to/missing/file" + i + ".txt not
found", "ContextU" + i,
+ "fileNotFoundError" + i, "filenotfoundexception",
singletonMap("keyU" + i, "valueU" + i)));
+ }
+
+ // Add 15 SYSTEM INFRA errors
+ for (int i = 0; i < 15; i++) {
+ issues.add(new Issue(ZonedDateTime.now().minusMinutes(15 + i),
IssueSeverity.ERROR, "SYSTEM_INFRA",
+ "RemoteException: server too busy to handle request " + i,
"ContextS" + i, "serverBusyError" + i,
+ "remoteexception", singletonMap("keyS" + i, "valueS" + i)));
+ }
+
+ // Add 15 GAAS errors
+ for (int i = 0; i < 15; i++) {
+ issues.add(new Issue(ZonedDateTime.now().minusMinutes(30 + i),
IssueSeverity.ERROR, "GAAS",
+ "TimeoutFailure: message='activity heartbeat timeout " + i + "'",
"ContextG" + i, "timeoutError" + i,
+ "timeoutfailure", singletonMap("keyG" + i, "valueG" + i)));
+ }
+
+ // Add 15 MISC errors
+ for (int i = 0; i < 15; i++) {
+ issues.add(new Issue(ZonedDateTime.now().minusMinutes(45 + i),
IssueSeverity.ERROR, "MISC",
+ "NullPointerException: null reference in operation " + i, "ContextM"
+ i, "nullPointerError" + i,
+ "nullpointerexception", singletonMap("keyM" + i, "valueM" + i)));
+ }
+
+ // Add 10 NON FATAL errors (lower priority)
+ for (int i = 0; i < 10; i++) {
+ issues.add(new Issue(ZonedDateTime.now().minusMinutes(60 + i),
IssueSeverity.WARN, "NON FATAL",
+ "Warning: failed to delete temporary file " + i + " - cleanup will
retry later", "ContextN" + i,
+ "deleteFailureWarning" + i, "ioexception", singletonMap("keyN" + i,
"valueN" + i)));
+ }
+
+ return issues;
+ }
+
+ public static List<Issue> testMinimumErrorCountBelowThreshold() {
+ // Returns a list of Issues where the number of errors is less than the
maximum threshold required for a category
+ // This should not trigger category match or should be handled gracefully
+ List<Issue> issues = new ArrayList<>();
+
+ // Single error from each category - assuming maximum threshold is > 1
+ // These should either not be classified or handled with special logic
+
+ // Single USER error
+ issues.add(new Issue(ZonedDateTime.now(), IssueSeverity.ERROR, "USER",
+ "FileNotFoundException: /single/missing/file.txt not found",
"ContextU1", "singleFileNotFoundError",
+ "filenotfoundexception", singletonMap("keyU1", "valueU1")));
+
+ // Single SYSTEM INFRA error
+ issues.add(new Issue(ZonedDateTime.now().minusMinutes(5),
IssueSeverity.ERROR, "SYSTEM_INFRA",
+ "RemoteException: server too busy - single occurrence", "ContextS1",
"singleServerBusyError", "remoteexception",
+ singletonMap("keyS1", "valueS1")));
+
+ // Single GAAS error
+ issues.add(new Issue(ZonedDateTime.now().minusMinutes(10),
IssueSeverity.ERROR, "GAAS",
+ "TimeoutFailure: message='single activity heartbeat timeout'",
"ContextG1", "singleTimeoutError",
+ "timeoutfailure", singletonMap("keyG1", "valueG1")));
+
+ // Single MISC error
+ issues.add(new Issue(ZonedDateTime.now().minusMinutes(15),
IssueSeverity.ERROR, "MISC",
+ "NullPointerException: single null reference occurrence", "ContextM1",
"singleNullPointerError",
+ "nullpointerexception", singletonMap("keyM1", "valueM1")));
+
+ // Single NON FATAL error
+ issues.add(new Issue(ZonedDateTime.now().minusMinutes(20),
IssueSeverity.ERROR, "NON FATAL",
+ "Warning: single failed delete operation - cleanup will retry",
"ContextN1", "singleDeleteFailureWarning",
+ "ioexception", singletonMap("keyN1", "valueN1")));
+
+ return issues;
+ }
+
+ public static List<Issue>
testMixedCategoryIssuesFromLowestToHighestPriority() {
+ // Returns a list of Issues exceeding the display/processing limit,
ordered from lowest to highest priority
+ List<Issue> issues = new ArrayList<>();
+
+ // Add 5 UNKNOWN (lowest priority) - errors that don't match any regex
patterns
+ String[] unknownErrors =
+ {"CustomBusinessLogicException: validation failed for user input",
"DatabaseConnectionException: connection pool exhausted",
"ConfigurationException: invalid property value detected", "SecurityException:
authentication failed with external service", "NetworkException: unable to
reach external API endpoint"};
+ for (int i = 0; i < 5; i++) {
+ issues.add(new Issue(ZonedDateTime.now().minusMinutes(60 + i),
IssueSeverity.INFO, "UNKNOWN", unknownErrors[i],
+ "ContextU" + i, "unknownError" + i, "unknownexception",
singletonMap("keyU" + i, "valueU" + i)));
+ }
+
+ // Add 5 NON FATAL - errors that match NON FATAL regex patterns
+ String[] nonFatalErrors =
+ {"CopyDataPublisher warning: srcfs operation completed with minor
issues", "Warning: failed to read from all available datanodes, retrying with
backup nodes", "Info: failed to delete temporary file - cleanup will retry
later", "Exception thrown from InGraphsReporter#report. Exception was
suppressed and processing continued", "ApplicationFailure: message='task
failed: java.lang.ArrayIndexOutOfBoundsException at index 5'"};
+ for (int i = 0; i < 5; i++) {
+ issues.add(new Issue(ZonedDateTime.now().minusMinutes(50 + i),
IssueSeverity.WARN, "NON FATAL", nonFatalErrors[i],
+ "ContextN" + i, "nonFatalError" + i, "nonfatalexception",
singletonMap("keyN" + i, "valueN" + i)));
+ }
+
+ // Add 5 MISC - errors that match MISC regex patterns
+ String[] miscErrors =
+ {"InterruptedIOException occurred during file operation",
"NullPointerException: attempt to invoke method on null object reference",
"ClassNotFoundException: com.example.MissingClass not found in classpath",
"IOException: error: likely concurrent writing to destination: (just prior)
TableMetadata collision detected", "ApplicationFailure: failed to clean up all
writers during shutdown"};
+ for (int i = 0; i < 5; i++) {
+ issues.add(new Issue(ZonedDateTime.now().minusMinutes(40 + i),
IssueSeverity.ERROR, "MISC", miscErrors[i],
+ "ContextM" + i, "miscError" + i, "miscexception",
singletonMap("keyM" + i, "valueM" + i)));
+ }
+
+ // Add 5 GAAS - errors that match GAAS regex patterns
+ String[] gaasErrors =
+ {"TimeoutFailure: message='activity heartbeat timeout exceeded
configured limit'", "TimeoutException: failed to allocate memory within the
configured max blocking time of 5000ms", "ApplicationFailure: message='unable
to create new native thread - system limits reached'", "IOException: filesystem
closed unexpectedly during operation", "IllegalStateException: expected
END_ARRAY but found different token type"};
+ for (int i = 0; i < 5; i++) {
+ issues.add(new Issue(ZonedDateTime.now().minusMinutes(30 + i),
IssueSeverity.ERROR, "GAAS", gaasErrors[i],
+ "ContextG" + i, "gaasError" + i, "gaasexception",
singletonMap("keyG" + i, "valueG" + i)));
+ }
+
+ // Add 5 SYSTEM INFRA - errors that match SYSTEM INFRA regex patterns
+ String[] systemInfraErrors =
+ {"RemoteException: server too busy to handle request at this time",
"JSchException: session is down and cannot be restored", "SafeModeException:
Name node is in safe mode and cannot accept changes", "SocketTimeoutException:
Read timeout after 30000ms waiting for response", "ConnectException: connection
refused when trying to connect to remote service"};
+ for (int i = 0; i < 5; i++) {
+ issues.add(
+ new Issue(ZonedDateTime.now().minusMinutes(20 + i),
IssueSeverity.ERROR, "SYSTEM_INFRA", systemInfraErrors[i],
+ "ContextS" + i, "systemInfraError" + i, "systeminfraexception",
singletonMap("keyS" + i, "valueS" + i)));
+ }
+
+ // Add 5 USER (highest priority) - errors that match USER regex patterns
+ String[] userErrors =
+ {"Job failed due to the namespace quota being exceeded for user data",
"Task failed with permission denied when accessing secure directory",
"Operation failed because the diskspace quota has been reached",
"FileNotFoundException: /path/to/missing/file.txt not found", "HTTP connection
failed for getting token from AzureAD authentication service"};
+ for (int i = 0; i < 5; i++) {
+ issues.add(new Issue(ZonedDateTime.now().minusMinutes(10 + i),
IssueSeverity.ERROR, "USER", userErrors[i],
+ "ContextUR" + i, "userError" + i, "userexception",
singletonMap("keyUR" + i, "valueUR" + i)));
+ }
+
+ return issues;
+ }
+}
+
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java
index a6e729200f..ea26ede30f 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.runtime.troubleshooter;
+import java.util.List;
import java.util.Objects;
import java.util.Properties;
@@ -29,6 +30,8 @@ import com.typesafe.config.Config;
import javax.inject.Inject;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.util.GsonUtils;
@@ -40,7 +43,7 @@ import org.apache.gobblin.util.ConfigUtils;
*
* It will additionally log received issues, so that they can be processed by
an analytical systems to determine
* the overall platform health.
- * */
+ **/
@Slf4j
public class JobIssueEventHandler {
@@ -53,14 +56,17 @@ public class JobIssueEventHandler {
private final MultiContextIssueRepository issueRepository;
private final boolean logReceivedEvents;
+ private final Config config;
+
@Inject
public JobIssueEventHandler(MultiContextIssueRepository issueRepository,
Config config) {
- this(issueRepository, ConfigUtils.getBoolean(config, LOG_RECEIVED_EVENTS,
true));
+ this(issueRepository, ConfigUtils.getBoolean(config, LOG_RECEIVED_EVENTS,
true), config);
}
- public JobIssueEventHandler(MultiContextIssueRepository issueRepository,
boolean logReceivedEvents) {
+ public JobIssueEventHandler(MultiContextIssueRepository issueRepository,
boolean logReceivedEvents, Config config) {
this.issueRepository = Objects.requireNonNull(issueRepository);
this.logReceivedEvents = logReceivedEvents;
+ this.config = config;
}
public void processEvent(GobblinTrackingEvent event) {
@@ -83,8 +89,9 @@ public class JobIssueEventHandler {
try {
issueRepository.put(contextId, issueEvent.getIssue());
} catch (TroubleshooterException e) {
- log.warn(String.format("Failed to save issue to repository. Issue time:
%s, code: %s",
- issueEvent.getIssue().getTime(),
issueEvent.getIssue().getCode()), e);
+ log.warn(
+ String.format("Failed to save issue to repository. Issue time: %s,
code: %s", issueEvent.getIssue().getTime(),
+ issueEvent.getIssue().getCode()), e);
}
if (logReceivedEvents) {
@@ -114,4 +121,23 @@ public class JobIssueEventHandler {
String jobName;
Issue issue;
}
-}
+
+ public List<Issue> getErrorListForClassification(String contextId)
+ throws TroubleshooterException {
+ int limit = ConfigUtils.getInt(config,
ServiceConfigKeys.ERROR_CLASSIFICATION_MAX_ERRORS_TO_PROCESS_KEY,
+ ServiceConfigKeys.DEFAULT_ERROR_CLASSIFICATION_MAX_ERRORS_TO_PROCESS);
+ return issueRepository.getMostRecentErrors(contextId, limit);
+ }
+
+ public void logFinalError(Issue issue, String flowName, String flowGroup,
String flowExecutionId, String jobName) {
+ JobIssueLogEntry logEntry = new JobIssueLogEntry();
+ logEntry.issue = issue;
+ logEntry.flowName = flowName;
+ logEntry.flowGroup = flowGroup;
+ logEntry.flowExecutionId = flowExecutionId;
+ logEntry.jobName = jobName;
+
+ String serializedIssueEvent =
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(logEntry);
+ issueLogger.info(serializedIssueEvent);
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
index 1d30ed3131..0b6f2279e4 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/MultiContextIssueRepository.java
@@ -37,6 +37,12 @@ public interface MultiContextIssueRepository extends Service
{
List<Issue> getAll(String contextId)
throws TroubleshooterException;
+ /**
+ * Will return issues with severity as ERROR in the reverse order as they
were put into the repository.
+ * */
+ List<Issue> getMostRecentErrors(String contextId, int limit)
+ throws TroubleshooterException;
+
void put(String contextId, Issue issue)
throws TroubleshooterException;
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/NoopIssueRepository.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/NoopIssueRepository.java
index f47f9085fb..b3716d66f0 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/NoopIssueRepository.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/NoopIssueRepository.java
@@ -29,6 +29,12 @@ public class NoopIssueRepository implements IssueRepository {
return Collections.emptyList();
}
+ @Override
+ public List<Issue> getMostRecentErrors(int limit)
+ throws TroubleshooterException {
+ return Collections.emptyList();
+ }
+
@Override
public void put(Issue issue)
throws TroubleshooterException {
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/ErrorClassifierTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/ErrorClassifierTest.java
new file mode 100644
index 0000000000..9ba3b56615
--- /dev/null
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/ErrorClassifierTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import org.apache.gobblin.configuration.ErrorCategory;
+import org.apache.gobblin.configuration.ErrorPatternProfile;
+import org.apache.gobblin.metastore.InMemoryErrorPatternStore;
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+import org.apache.gobblin.runtime.troubleshooter.IssueTestDataProvider;
+import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
+
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.*;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import static org.testng.Assert.*;
+
+
+public class ErrorClassifierTest {
+ private ErrorClassifier classifier;
+ private List<ErrorCategory> categories;
+ private ErrorCategory _defaultErrorCategory;
+ private InMemoryErrorPatternStore store;
+
+ @BeforeMethod
+ public void setUp()
+ throws IOException {
+ // Use the shared test data
+ categories = IssueTestDataProvider.TEST_CATEGORIES;
+ _defaultErrorCategory = IssueTestDataProvider.TEST_DEFAULT_ERROR_CATEGORY;
+ List<ErrorPatternProfile> errorPatternProfiles =
IssueTestDataProvider.getSortedPatterns();
+
+ Config testConfig = ConfigFactory.empty();
+ store = new InMemoryErrorPatternStore(testConfig);
+ store.upsertCategory(categories);
+ store.upsertPatterns(errorPatternProfiles);
+ store.setDefaultCategory(_defaultErrorCategory);
+
+ classifier = new ErrorClassifier(store, testConfig);
+ }
+
+ @Test
+ public void testUserCategoryIssues() {
+ List<Issue> issues = IssueTestDataProvider.testUserCategoryIssues();
+ Issue result = classifier.classifyEarlyStopWithDefault(issues);
+ assertNotNull(result);
+ assertTrue(result.getSummary().contains("USER"));
+ assertEquals(result.getSeverity(), IssueSeverity.ERROR);
+ assertTrue(
+ result.getDetails().contains("permission denied") ||
result.getDetails().contains("FileNotFoundException"));
+ }
+
+ @Test
+ public void testSystemInfraCategoryIssues() {
+ List<Issue> issues = IssueTestDataProvider.testSystemInfraCategoryIssues();
+ Issue result = classifier.classifyEarlyStopWithDefault(issues);
+ assertNotNull(result);
+ assertTrue(result.getSummary().contains("SYSTEM INFRA"));
+ assertEquals(result.getSeverity(), IssueSeverity.ERROR);
+ assertTrue(result.getDetails().contains("server too busy") ||
result.getDetails().contains("SafeModeException"));
+ }
+
+ @Test
+ public void testGaasCategoryIssues() {
+ List<Issue> issues = IssueTestDataProvider.testGaasCategoryIssues();
+ Issue result = classifier.classifyEarlyStopWithDefault(issues);
+ assertNotNull(result);
+ assertTrue(result.getSummary().contains("GAAS"));
+ assertEquals(result.getSeverity(), IssueSeverity.ERROR);
+ assertTrue(result.getDetails().contains("TimeoutFailure") ||
result.getDetails().contains("ApplicationFailure"));
+ }
+
+ @Test
+ public void testMiscCategoryIssues() {
+ List<Issue> issues = IssueTestDataProvider.testMiscCategoryIssues();
+ Issue result = classifier.classifyEarlyStopWithDefault(issues);
+ assertNotNull(result);
+ assertTrue(result.getSummary().contains("MISC"));
+ assertEquals(result.getSeverity(), IssueSeverity.ERROR);
+ assertTrue(
+ result.getDetails().contains("NullPointerException") ||
result.getDetails().contains("InterruptedIOException"));
+ }
+
+ @Test
+ public void testNonFatalCategoryIssues() {
+ List<Issue> issues = IssueTestDataProvider.testNonFatalCategoryIssues();
+ Issue result = classifier.classifyEarlyStopWithDefault(issues);
+ assertNotNull(result);
+ assertTrue(result.getSummary().contains("NON FATAL"));
+ assertEquals(result.getSeverity(), IssueSeverity.ERROR);
+ assertTrue(result.getDetails().contains("CopyDataPublisher") ||
result.getDetails()
+ .contains("failed to delete temporary file"));
+ }
+
+ @Test
+ public void testNegativeMatchCases() {
+ List<Issue> issues = IssueTestDataProvider.testNegativeMatchCases();
+ Issue result = classifier.classifyEarlyStopWithDefault(issues);
+ assertNotNull(result);
+ assertTrue(result.getSummary().contains("UNKNOWN"));
+ assertEquals(result.getSeverity(), IssueSeverity.ERROR);
+ }
+
+ @Test
+ public void testEdgeCasesAndBoundaryConditions() {
+ List<Issue> issues =
IssueTestDataProvider.testEdgeCasesAndBoundaryConditions();
+ Issue result = classifier.classifyEarlyStopWithDefault(issues);
+ assertNotNull(result);
+ assertTrue(result.getSummary().contains("USER"));
+ assertEquals(result.getSeverity(), IssueSeverity.ERROR);
+ }
+
+ @Test
+ public void testNonFatalAndUnknownMix() {
+ List<Issue> issues = IssueTestDataProvider.testNonFatalAndUnknownMix();
+ Issue result = classifier.classifyEarlyStopWithDefault(issues);
+ assertNotNull(result);
+
+ assertTrue(result.getSummary().contains("UNKNOWN"));
+
+ }
+
+ @Test
+ public void testMaximumErrorCountExceeded() {
+ List<Issue> issues = IssueTestDataProvider.testMaximumErrorCountExceeded();
+ Issue result = classifier.classifyEarlyStopWithDefault(issues);
+ assertNotNull(result);
+ assertTrue(result.getSummary().contains("USER"));
+
+ assertTrue(result.getDetails().contains("/path/to/missing/file1"));
+ assertTrue(result.getDetails().contains("/path/to/missing/file9"));
+
+ assertFalse(result.getDetails().contains("/path/to/missing/file11"));
+ assertFalse(result.getDetails().contains("/path/to/missing/file14"));
+
+ assertEquals(result.getSeverity(), IssueSeverity.ERROR);
+ }
+
+ @Test
+ public void testMinimumErrorCountBelowThreshold() {
+ List<Issue> issues =
IssueTestDataProvider.testMinimumErrorCountBelowThreshold();
+ Issue result = classifier.classifyEarlyStopWithDefault(issues);
+ assertNotNull(result);
+
+ assertTrue(result.getSummary().contains("USER"));
+ assertTrue(result.getDetails().contains("FileNotFoundException"));
+
+ assertFalse(result.getDetails().contains("server too busy"));
+
+ assertEquals(result.getSeverity(), IssueSeverity.ERROR);
+ }
+
+ @Test
+ public void testMixedCategoryIssuesFromLowestToHighestPriority() {
+ List<Issue> issues =
IssueTestDataProvider.testMixedCategoryIssuesFromLowestToHighestPriority();
+ Issue result = classifier.classifyEarlyStopWithDefault(issues);
+ assertNotNull(result);
+ assertTrue(result.getSummary().contains("USER"));
+ assertEquals(result.getSeverity(), IssueSeverity.ERROR);
+ }
+}
+
+
+
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java
index 616d8de0ba..3901ca1999 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandlerTest.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.runtime.troubleshooter;
import org.testng.annotations.Test;
+import com.typesafe.config.ConfigFactory;
+
import org.apache.gobblin.metrics.event.TimingEvent;
import static org.mockito.Mockito.any;
@@ -32,7 +34,7 @@ public class JobIssueEventHandlerTest {
public void canHandleIssue()
throws Exception {
MultiContextIssueRepository issueRepository =
mock(MultiContextIssueRepository.class);
- JobIssueEventHandler eventHandler = new
JobIssueEventHandler(issueRepository, true);
+ JobIssueEventHandler eventHandler = new
JobIssueEventHandler(issueRepository, true, ConfigFactory.empty());
IssueEventBuilder eventBuilder = new IssueEventBuilder("TestJob");
eventBuilder.setIssue(getTestIssue("test issue", "code1"));
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index b428962c1b..b78db5b1f7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -35,6 +35,7 @@ import com.typesafe.config.Config;
import javax.inject.Singleton;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.ErrorClassifier;
import org.apache.gobblin.restli.EmbeddedRestliServer;
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
import org.apache.gobblin.runtime.instance.StandardGobblinInstanceLauncher;
@@ -93,6 +94,8 @@ import
org.apache.gobblin.service.monitoring.SpecStoreChangeMonitor;
import org.apache.gobblin.service.monitoring.SpecStoreChangeMonitorFactory;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.metastore.ErrorPatternStore;
+import org.apache.gobblin.metastore.InMemoryErrorPatternStore;
public class GobblinServiceGuiceModule implements Module {
@@ -186,8 +189,12 @@ public class GobblinServiceGuiceModule implements Module {
if (serviceConfig.isJobStatusMonitorEnabled()) {
binder.bind(KafkaJobStatusMonitor.class).toProvider(KafkaJobStatusMonitorFactory.class).in(Singleton.class);
+ binder.bind(ErrorClassifier.class);
+ binder.bind(ErrorPatternStore.class)
+ .to(getClassByNameOrAlias(ErrorPatternStore.class,
serviceConfig.getInnerConfig(),
+ ServiceConfigKeys.ERROR_PATTERN_STORE_CLASS,
+ InMemoryErrorPatternStore.class.getName()));
}
-
binder.bind(FlowStatusGenerator.class);
if (serviceConfig.isSchedulerEnabled()) {
@@ -266,4 +273,4 @@ public class GobblinServiceGuiceModule implements Module {
.build();
}
}
-}
+}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/troubleshooter/MySqlMultiContextIssueRepository.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/troubleshooter/MySqlMultiContextIssueRepository.java
index 3c4c60d934..b6ba738767 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/troubleshooter/MySqlMultiContextIssueRepository.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/troubleshooter/MySqlMultiContextIssueRepository.java
@@ -123,6 +123,53 @@ public class MySqlMultiContextIssueRepository extends
AbstractIdleService implem
}
}
+ @Override
+ public List<Issue> getMostRecentErrors(String contextId, int limit)
+ throws TroubleshooterException {
+ Objects.requireNonNull(contextId, "contextId should not be null");
+
+ String querySql = "select code, time, severity, summary, details,
source_class, exception_class, properties "
+ + "from issues where context_id = ? and severity = 'ERROR' order by
time desc limit ?";
+
+ try (Connection connection =
databaseProvider.getDatasource().getConnection();
+ PreparedStatement statement = connection.prepareStatement(querySql)) {
+
+ statement.setString(1, contextId);
+ statement.setLong(2, limit);
+
+ ArrayList<Issue> issues = new ArrayList<>();
+
+ try (ResultSet results = statement.executeQuery()) {
+ while (results.next()) {
+ Issue.IssueBuilder issue = Issue.builder();
+ issue.code(results.getString(1));
+
issue.time(ZonedDateTime.ofInstant(Instant.ofEpochMilli(results.getTimestamp(2).getTime()),
ZoneOffset.UTC));
+ issue.severity(IssueSeverity.valueOf(results.getString(3)));
+ issue.summary(results.getString(4));
+ issue.details(results.getString(5));
+ issue.sourceClass(results.getString(6));
+ issue.exceptionClass(results.getString(7));
+
+ String serializedProperties = results.getString(8);
+ if (serializedProperties != null) {
+ Type mapType = new TypeToken<HashMap<String, String>>() {
+ }.getType();
+
+ HashMap<String, String> properties =
+
GsonUtils.GSON_WITH_DATE_HANDLING.fromJson(serializedProperties, mapType);
+ issue.properties(properties);
+ }
+
+ issues.add(issue.build());
+ }
+ }
+
+ return issues;
+ } catch (SQLException e) {
+ throw new TroubleshooterException("Cannot read issues from the
database", e);
+ }
+ }
+
@Override
public void put(String contextId, Issue issue)
throws TroubleshooterException {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index 74a864da37..6d567dc5b7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -44,6 +44,7 @@ import
org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistryFactory;
import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+import org.apache.gobblin.runtime.ErrorClassifier;
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.service.ExecutionStatus;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
@@ -66,9 +67,9 @@ public class KafkaAvroJobStatusMonitor extends
KafkaJobStatusMonitor {
private Meter messageParseFailures;
public KafkaAvroJobStatusMonitor(String topic, Config config, int
numThreads, JobIssueEventHandler jobIssueEventHandler,
- GaaSJobObservabilityEventProducer observabilityEventProducer,
DagManagementStateStore dagManagementStateStore)
+ GaaSJobObservabilityEventProducer observabilityEventProducer,
DagManagementStateStore dagManagementStateStore, ErrorClassifier
errorClassifier)
throws IOException, ReflectiveOperationException {
- super(topic, config, numThreads, jobIssueEventHandler,
observabilityEventProducer, dagManagementStateStore);
+ super(topic, config, numThreads, jobIssueEventHandler,
observabilityEventProducer, dagManagementStateStore, errorClassifier);
if (ConfigUtils.getBoolean(config,
ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, false)) {
KafkaAvroSchemaRegistry schemaRegistry = (KafkaAvroSchemaRegistry) new
KafkaAvroSchemaRegistryFactory().
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index f4a18d4e2f..c2afaa0498 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -46,6 +46,7 @@ import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import javax.inject.Inject;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -58,11 +59,14 @@ import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.ErrorClassifier;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
import org.apache.gobblin.runtime.retention.DatasetCleanerTask;
+import org.apache.gobblin.runtime.troubleshooter.Issue;
import org.apache.gobblin.runtime.troubleshooter.IssueEventBuilder;
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
@@ -89,8 +93,8 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
public static final String JOB_STATUS_MONITOR_PREFIX = "jobStatusMonitor";
//We use table suffix that is different from the Gobblin job state store
suffix of jst to avoid confusion.
//gst refers to the state store suffix for GaaS-orchestrated Gobblin jobs.
- public static final String GET_AND_SET_JOB_STATUS =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
- JOB_STATUS_MONITOR_PREFIX, "getAndSetJobStatus");
+ public static final String GET_AND_SET_JOB_STATUS =
+ MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
JOB_STATUS_MONITOR_PREFIX, "getAndSetJobStatus");
private static final String PROCESS_JOB_ISSUE = MetricRegistry
.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
JOB_STATUS_MONITOR_PREFIX, "jobIssueProcessingTime");
@@ -124,9 +128,12 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
private final GaaSJobObservabilityEventProducer eventProducer;
private final DagManagementStateStore dagManagementStateStore;
private final List<Class<? extends Exception>> nonRetryableExceptions =
Collections.singletonList(SQLIntegrityConstraintViolationException.class);
+ private final boolean isErrorClassificationEnabled;
+ private final ErrorClassifier errorClassifier;
+ @Inject
public KafkaJobStatusMonitor(String topic, Config config, int numThreads,
JobIssueEventHandler jobIssueEventHandler,
- GaaSJobObservabilityEventProducer observabilityEventProducer,
DagManagementStateStore dagManagementStateStore)
+ GaaSJobObservabilityEventProducer observabilityEventProducer,
DagManagementStateStore dagManagementStateStore, ErrorClassifier
errorClassifier)
throws ReflectiveOperationException {
super(topic, config.withFallback(DEFAULTS), numThreads);
String stateStoreFactoryClass = ConfigUtils.getString(config,
ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY,
FileContextBasedFsStateStoreFactory.class.getName());
@@ -137,11 +144,13 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
this.jobIssueEventHandler = jobIssueEventHandler;
this.dagManagementStateStore = dagManagementStateStore;
+ this.errorClassifier = errorClassifier;
Config retryerOverridesConfig =
config.hasPath(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
? config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
: ConfigFactory.empty();
// log exceptions to expose errors we suffer under and/or guide
intervention when resolution not readily forthcoming
+
this.persistJobStatusRetryer =
RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG),
Optional.of(new RetryListener() {
@Override
@@ -153,7 +162,9 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
}
}
}));
- this.eventProducer = observabilityEventProducer;
+ this.eventProducer = observabilityEventProducer;
+ this.isErrorClassificationEnabled =
+ ConfigUtils.getBoolean(this.config,
ServiceConfigKeys.ERROR_CLASSIFICATION_ENABLED_KEY, false);
}
public enum NewState {
@@ -177,13 +188,13 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
@Override
public void shutDown() {
- super.shutDown();
- this.scheduledExecutorService.shutdown();
- try {
- this.scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.error("Exception encountered when shutting down state store
cleaner", e);
- }
+ super.shutDown();
+ this.scheduledExecutorService.shutdown();
+ try {
+ this.scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.error("Exception encountered when shutting down state store
cleaner", e);
+ }
}
@Override
@@ -234,6 +245,28 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
boolean retryRequired = modifyStateIfRetryRequired(jobStatus);
if (updatedJobStatus.getRight() == NewState.FINISHED &&
!retryRequired) {
+ if (isErrorClassificationEnabled) {
+ if
(jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name()))
{
+ long startTime = System.currentTimeMillis();
+ try {
+ List<Issue> issues =
jobIssueEventHandler.getErrorListForClassification(
+
TroubleshooterUtils.getContextIdForJob(jobStatus.getProperties()));
+ Issue finalIssue =
errorClassifier.classifyEarlyStopWithDefault(issues);
+ if (finalIssue != null) {
+ jobIssueEventHandler.logFinalError(finalIssue, flowName,
flowGroup, String.valueOf(flowExecutionId),
+ jobName);
+ }
+ }
+ catch (Exception e) {
+ log.error("Failed to emit issue event for flowGroup: {},
flowName: {}, flowExecutionId: {}, jobName: {}", flowGroup, flowName,
flowExecutionId, jobName, e);
+ }
+ long processDuration = System.currentTimeMillis() - startTime;
+ log.info(
+ "Processing issues for flowGroup: {}, flowName: {},
flowExecutionId: {}, jobName: {}, duration: {} ms",
+ flowGroup, flowName, flowExecutionId, jobName,
processDuration);
+ }
+ }
+
// do not send event if retry is required, because it can alert
users to re-submit a job that is already set to be retried by GaaS
this.eventProducer.emitObservabilityEvent(jobStatus);
}
@@ -326,7 +359,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
// We use three things to accurately count and thereby bound retries,
even amidst out-of-order events (by skipping late arrivals).
// The generation is monotonically increasing, while the attempts may
re-initialize back to 0. this two-part form prevents the composite value from
ever repeating.
// And job status reflect the execution status in one attempt
- if (!isFlowStatusAndPendingResume && (previousStatus != null &&
currentStatus != null && (previousGeneration > currentGeneration || (
+ if (!isFlowStatusAndPendingResume && (previousStatus != null &&
currentStatus != null && (previousGeneration > currentGeneration || (
previousGeneration == currentGeneration && previousAttempts >
currentAttempts) || (previousGeneration == currentGeneration &&
previousAttempts == currentAttempts
&&
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus))
<
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))))
{
@@ -437,4 +470,4 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
protected abstract GobblinTrackingEvent
deserializeEvent(DecodeableKafkaRecord<byte[],byte[]> message);
protected abstract org.apache.gobblin.configuration.State
parseJobStatus(GobblinTrackingEvent event);
-}
+}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
index cb7b3d3bad..86ceaecfc7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
@@ -30,6 +30,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
+import org.apache.gobblin.runtime.ErrorClassifier;
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
@@ -51,15 +52,17 @@ public class KafkaJobStatusMonitorFactory implements
Provider<KafkaJobStatusMoni
private final MultiContextIssueRepository issueRepository;
private final boolean instrumentationEnabled;
private final DagManagementStateStore dagManagementStateStore;
+ private final ErrorClassifier errorClassifier;
@Inject
public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler
jobIssueEventHandler, MultiContextIssueRepository issueRepository,
- GobblinInstanceEnvironment env, DagManagementStateStore
dagManagementStateStore) {
+ GobblinInstanceEnvironment env, DagManagementStateStore
dagManagementStateStore, ErrorClassifier errorClassifier) {
this.config = Objects.requireNonNull(config);
this.jobIssueEventHandler = Objects.requireNonNull(jobIssueEventHandler);
this.issueRepository = issueRepository;
this.instrumentationEnabled = env.isInstrumentationEnabled();
this.dagManagementStateStore = dagManagementStateStore;
+ this.errorClassifier = errorClassifier;
}
private KafkaJobStatusMonitor createJobStatusMonitor()
@@ -94,7 +97,7 @@ public class KafkaJobStatusMonitorFactory implements
Provider<KafkaJobStatusMoni
return (KafkaJobStatusMonitor) GobblinConstructorUtils
.invokeLongestConstructor(jobStatusMonitorClass, topic,
jobStatusConfig, numThreads, jobIssueEventHandler, observabilityEventProducer,
- dagManagementStateStore);
+ dagManagementStateStore, errorClassifier);
}
@Override
@@ -105,4 +108,4 @@ public class KafkaJobStatusMonitorFactory implements
Provider<KafkaJobStatusMoni
throw new RuntimeException(e);
}
}
-}
+}
\ No newline at end of file