[ https://issues.apache.org/jira/browse/GOBBLIN-2211?focusedWorklogId=975621&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-975621 ]
ASF GitHub Bot logged work on GOBBLIN-2211: ------------------------------------------- Author: ASF GitHub Bot Created on: 22/Jul/25 09:35 Start Date: 22/Jul/25 09:35 Worklog Time Spent: 10m Work Description: NamsB7 commented on code in PR #4121: URL: https://github.com/apache/gobblin/pull/4121#discussion_r2221902772 ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java: ########## @@ -234,8 +248,31 @@ protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) { boolean retryRequired = modifyStateIfRetryRequired(jobStatus); if (updatedJobStatus.getRight() == NewState.FINISHED && !retryRequired) { - // do not send event if retry is required, because it can alert users to re-submit a job that is already set to be retried by GaaS - this.eventProducer.emitObservabilityEvent(jobStatus); + if (isErrorClassificationEnabled) { + long startTime = System.currentTimeMillis(); + if (jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())) { + List<Issue> issues = jobIssueEventHandler.getErrorListForClassification( + TroubleshooterUtils.getContextIdForJob(jobStatus.getProperties())); + long process_duration = System.currentTimeMillis() - startTime; + log.info("Processing issues for job: {}, duration: {} ms", jobStatus, flowName, flowExecutionId, jobName, process_duration); + try { + Issue finalIssue = errorClassifier.classifyEarlyStopWithDefault(issues); + if (finalIssue != null) { + jobIssueEventHandler.LogFinalError(finalIssue, flowName, flowGroup, String.valueOf(flowExecutionId), + jobName); + long final_duration = System.currentTimeMillis() - startTime; + log.info("Classified issues for job: {}, duration: {} ms", jobStatus,final_duration); + } + } catch (Exception e) { + log.error("Error classifying issues for job: {}", jobStatus, e); + long final_duration = System.currentTimeMillis() - startTime; + log.info("Error classification for job: {}, duration: {} ms", jobStatus, final_duration); Review Comment: Added only constitutents of contextId for logging. ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java: ########## @@ -234,8 +248,31 @@ protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) { boolean retryRequired = modifyStateIfRetryRequired(jobStatus); if (updatedJobStatus.getRight() == NewState.FINISHED && !retryRequired) { - // do not send event if retry is required, because it can alert users to re-submit a job that is already set to be retried by GaaS - this.eventProducer.emitObservabilityEvent(jobStatus); + if (isErrorClassificationEnabled) { + long startTime = System.currentTimeMillis(); Review Comment: Done Issue Time Tracking ------------------- Worklog Id: (was: 975621) Time Spent: 3h 20m (was: 3h 10m) > Implement Error Classification based on execution issues > -------------------------------------------------------- > > Key: GOBBLIN-2211 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2211 > Project: Apache Gobblin > Issue Type: Bug > Components: gobblin-service > Reporter: Abhishek Jain > Assignee: Abhishek Tiwari > Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > Implement Error Classification to categorize the failure reason based on > issues encountered. -- This message was sent by Atlassian Jira (v8.20.10#820010)