abhishekmjain commented on code in PR #4121:
URL: https://github.com/apache/gobblin/pull/4121#discussion_r2217907400


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java:
##########
@@ -186,8 +190,15 @@ binding time (optionally bound classes cannot have names 
associated with them),
 
     if (serviceConfig.isJobStatusMonitorEnabled()) {
       
binder.bind(KafkaJobStatusMonitor.class).toProvider(KafkaJobStatusMonitorFactory.class).in(Singleton.class);
+      binder.bind(ErrorClassifier.class);
+      binder.bind(ErrorPatternStore.class)
+          .to(getClassByNameOrAlias(ErrorPatternStore.class, 
serviceConfig.getInnerConfig(),
+              ServiceConfigKeys.ERROR_PATTERN_STORE_CLASS,
+              InMemoryErrorPatternStore.class.getName()));
+      binder.bind(ErrorPatternStore.class).to(MysqlErrorPatternStore.class);
     }
-
+    binder.bind(MysqlErrorPatternStore.class);
+    binder.bind(InMemoryErrorPatternStore.class);

Review Comment:
   are these needed if we bind `ErrorPatternStore.class` based on config on 
line 194



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -234,8 +248,31 @@ protected void 
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
           boolean retryRequired = modifyStateIfRetryRequired(jobStatus);
 
           if (updatedJobStatus.getRight() == NewState.FINISHED && 
!retryRequired) {
-            // do not send event if retry is required, because it can alert 
users to re-submit a job that is already set to be retried by GaaS
-            this.eventProducer.emitObservabilityEvent(jobStatus);
+            if (isErrorClassificationEnabled) {
+              long startTime = System.currentTimeMillis();

Review Comment:
   startTime can be initialized in the below if block



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -234,8 +248,31 @@ protected void 
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
           boolean retryRequired = modifyStateIfRetryRequired(jobStatus);
 
           if (updatedJobStatus.getRight() == NewState.FINISHED && 
!retryRequired) {
-            // do not send event if retry is required, because it can alert 
users to re-submit a job that is already set to be retried by GaaS
-            this.eventProducer.emitObservabilityEvent(jobStatus);
+            if (isErrorClassificationEnabled) {
+              long startTime = System.currentTimeMillis();
+              if 
(jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name()))
 {
+                List<Issue> issues = 
jobIssueEventHandler.getErrorListForClassification(
+                    
TroubleshooterUtils.getContextIdForJob(jobStatus.getProperties()));
+                    long process_duration = System.currentTimeMillis() - 
startTime;
+                    log.info("Processing issues for job: {}, duration: {} ms", 
jobStatus, flowName, flowExecutionId, jobName, process_duration);

Review Comment:
   fix indentation



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -234,8 +248,31 @@ protected void 
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
           boolean retryRequired = modifyStateIfRetryRequired(jobStatus);
 
           if (updatedJobStatus.getRight() == NewState.FINISHED && 
!retryRequired) {
-            // do not send event if retry is required, because it can alert 
users to re-submit a job that is already set to be retried by GaaS
-            this.eventProducer.emitObservabilityEvent(jobStatus);
+            if (isErrorClassificationEnabled) {
+              long startTime = System.currentTimeMillis();
+              if 
(jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name()))
 {
+                List<Issue> issues = 
jobIssueEventHandler.getErrorListForClassification(
+                    
TroubleshooterUtils.getContextIdForJob(jobStatus.getProperties()));
+                    long process_duration = System.currentTimeMillis() - 
startTime;
+                    log.info("Processing issues for job: {}, duration: {} ms", 
jobStatus, flowName, flowExecutionId, jobName, process_duration);

Review Comment:
   log message has 2 {} whereas 5 parameters are being sent



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/IssueRepository.java:
##########
@@ -35,6 +35,9 @@ public interface IssueRepository {
   List<Issue> getAll()
       throws TroubleshooterException;
 
+  List<Issue> getAllTopRecentErrors(int limit)

Review Comment:
   nit: can we call it `getMostRecentErrors` ?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -234,8 +248,31 @@ protected void 
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
           boolean retryRequired = modifyStateIfRetryRequired(jobStatus);
 
           if (updatedJobStatus.getRight() == NewState.FINISHED && 
!retryRequired) {
-            // do not send event if retry is required, because it can alert 
users to re-submit a job that is already set to be retried by GaaS
-            this.eventProducer.emitObservabilityEvent(jobStatus);
+            if (isErrorClassificationEnabled) {
+              long startTime = System.currentTimeMillis();
+              if 
(jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name()))
 {
+                List<Issue> issues = 
jobIssueEventHandler.getErrorListForClassification(
+                    
TroubleshooterUtils.getContextIdForJob(jobStatus.getProperties()));
+                    long process_duration = System.currentTimeMillis() - 
startTime;
+                    log.info("Processing issues for job: {}, duration: {} ms", 
jobStatus, flowName, flowExecutionId, jobName, process_duration);
+                try {
+                  Issue finalIssue = 
errorClassifier.classifyEarlyStopWithDefault(issues);
+                  if (finalIssue != null) {
+                    jobIssueEventHandler.LogFinalError(finalIssue, flowName, 
flowGroup, String.valueOf(flowExecutionId),
+                        jobName);
+                    long final_duration = System.currentTimeMillis() - 
startTime;
+                    log.info("Classified issues for job: {}, duration: {} ms", 
jobStatus,final_duration);
+                  }
+                } catch (Exception e) {
+                  log.error("Error classifying issues for job: {}", jobStatus, 
e);
+                  long final_duration = System.currentTimeMillis() - startTime;
+                  log.info("Error classification for job: {}, duration: {} 
ms", jobStatus, final_duration);

Review Comment:
   Since jobStatus contains a lot of properties, let's skip adding it to logs.
   We can use constituents of contextId for logging.
   
   Also, if we want to log the final duration in all scenarios, let's put it 
outside of try-catch block once instead of repeating it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to