abhishekmjain commented on code in PR #4121: URL: https://github.com/apache/gobblin/pull/4121#discussion_r2217907400
########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java: ########## @@ -186,8 +190,15 @@ binding time (optionally bound classes cannot have names associated with them), if (serviceConfig.isJobStatusMonitorEnabled()) { binder.bind(KafkaJobStatusMonitor.class).toProvider(KafkaJobStatusMonitorFactory.class).in(Singleton.class); + binder.bind(ErrorClassifier.class); + binder.bind(ErrorPatternStore.class) + .to(getClassByNameOrAlias(ErrorPatternStore.class, serviceConfig.getInnerConfig(), + ServiceConfigKeys.ERROR_PATTERN_STORE_CLASS, + InMemoryErrorPatternStore.class.getName())); + binder.bind(ErrorPatternStore.class).to(MysqlErrorPatternStore.class); } - + binder.bind(MysqlErrorPatternStore.class); + binder.bind(InMemoryErrorPatternStore.class); Review Comment: are these needed if we bind `ErrorPatternStore.class` based on config on line 194 ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java: ########## @@ -234,8 +248,31 @@ protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) { boolean retryRequired = modifyStateIfRetryRequired(jobStatus); if (updatedJobStatus.getRight() == NewState.FINISHED && !retryRequired) { - // do not send event if retry is required, because it can alert users to re-submit a job that is already set to be retried by GaaS - this.eventProducer.emitObservabilityEvent(jobStatus); + if (isErrorClassificationEnabled) { + long startTime = System.currentTimeMillis(); Review Comment: startTime can be initialized in the below if block ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java: ########## @@ -234,8 +248,31 @@ protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) { boolean retryRequired = modifyStateIfRetryRequired(jobStatus); if (updatedJobStatus.getRight() == NewState.FINISHED && !retryRequired) { - // do not send event if retry is required, because it can alert users to re-submit a job that is already set to be retried by GaaS - this.eventProducer.emitObservabilityEvent(jobStatus); + if (isErrorClassificationEnabled) { + long startTime = System.currentTimeMillis(); + if (jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())) { + List<Issue> issues = jobIssueEventHandler.getErrorListForClassification( + TroubleshooterUtils.getContextIdForJob(jobStatus.getProperties())); + long process_duration = System.currentTimeMillis() - startTime; + log.info("Processing issues for job: {}, duration: {} ms", jobStatus, flowName, flowExecutionId, jobName, process_duration); Review Comment: fix indentation ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java: ########## @@ -234,8 +248,31 @@ protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) { boolean retryRequired = modifyStateIfRetryRequired(jobStatus); if (updatedJobStatus.getRight() == NewState.FINISHED && !retryRequired) { - // do not send event if retry is required, because it can alert users to re-submit a job that is already set to be retried by GaaS - this.eventProducer.emitObservabilityEvent(jobStatus); + if (isErrorClassificationEnabled) { + long startTime = System.currentTimeMillis(); + if (jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())) { + List<Issue> issues = jobIssueEventHandler.getErrorListForClassification( + TroubleshooterUtils.getContextIdForJob(jobStatus.getProperties())); + long process_duration = System.currentTimeMillis() - startTime; + log.info("Processing issues for job: {}, duration: {} ms", jobStatus, flowName, flowExecutionId, jobName, process_duration); Review Comment: log message has 2 {} whereas 5 parameters are being sent ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/IssueRepository.java: ########## @@ -35,6 +35,9 @@ public interface IssueRepository { List<Issue> getAll() throws TroubleshooterException; + List<Issue> getAllTopRecentErrors(int limit) Review Comment: nit: can we call it `getMostRecentErrors` ? ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java: ########## @@ -234,8 +248,31 @@ protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) { boolean retryRequired = modifyStateIfRetryRequired(jobStatus); if (updatedJobStatus.getRight() == NewState.FINISHED && !retryRequired) { - // do not send event if retry is required, because it can alert users to re-submit a job that is already set to be retried by GaaS - this.eventProducer.emitObservabilityEvent(jobStatus); + if (isErrorClassificationEnabled) { + long startTime = System.currentTimeMillis(); + if (jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())) { + List<Issue> issues = jobIssueEventHandler.getErrorListForClassification( + TroubleshooterUtils.getContextIdForJob(jobStatus.getProperties())); + long process_duration = System.currentTimeMillis() - startTime; + log.info("Processing issues for job: {}, duration: {} ms", jobStatus, flowName, flowExecutionId, jobName, process_duration); + try { + Issue finalIssue = errorClassifier.classifyEarlyStopWithDefault(issues); + if (finalIssue != null) { + jobIssueEventHandler.LogFinalError(finalIssue, flowName, flowGroup, String.valueOf(flowExecutionId), + jobName); + long final_duration = System.currentTimeMillis() - startTime; + log.info("Classified issues for job: {}, duration: {} ms", jobStatus,final_duration); + } + } catch (Exception e) { + log.error("Error classifying issues for job: {}", jobStatus, e); + long final_duration = System.currentTimeMillis() - startTime; + log.info("Error classification for job: {}, duration: {} ms", jobStatus, final_duration); Review Comment: Since jobStatus contains a lot of properties, let's skip adding it to logs. We can use constituents of contextId for logging. Also, if we want to log the final duration in all scenarios, let's put it outside of try-catch block once instead of repeating it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org