phet commented on code in PR #3997:
URL: https://github.com/apache/gobblin/pull/3997#discussion_r1671665510


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -26,14 +26,8 @@
  * Consumption of the Dags happen through {@link DagTaskStream}.
  */
 public interface DagManagement {
-
-  /**
-   * Used to add a dagAction event to DagManagement
-   */
-  void addDagAction(DagActionStore.DagAction dagAction) throws IOException;
-
   /**
-   * Used to add reminder dagActions to the queue that already contain an 
eventTimestamp from the previous lease attempt
+   * Used to add {@link DagActionStore.LeaseParams} to the queue that already 
contain an eventTimestamp from the previous lease attempt
    */
-  void addReminderDagAction(DagActionStore.LeaseParams reminderLeaseParams) 
throws IOException;
+  void addDagAction(DagActionStore.LeaseParams reminderLeaseParams) throws 
IOException;

Review Comment:
   should the param still be named `reminder...`?
   
   also (javadoc) - in the general case, it doesn't necessarily "already 
contain ... from the prev. lease attempt"?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -107,23 +107,10 @@ public DagManagementTaskStreamImpl(Config config, 
Optional<DagActionStore> dagAc
     this.dagManagementStateStore = dagManagementStateStore;
   }
 
-  @Override
-  public synchronized void addDagAction(DagActionStore.DagAction dagAction) {
-    // TODO: Used to track missing dag issue, remove later as needed
-    log.info("Add original (non-reminder) dagAction {}", dagAction);
-
-    if (!this.leaseParamsQueue.offer(new DagActionStore.LeaseParams(dagAction, 
false, System.currentTimeMillis()))) {
-      throw new RuntimeException(String.format("Could not add dag action to 
the queue %s", dagAction));
-    }
-  }
-
-  @Override
-  public synchronized void addReminderDagAction(DagActionStore.LeaseParams 
reminderLeaseParams) {
-    // TODO: Used to track missing dag issue, remove later as needed
-    log.info("Add reminder dagAction {}", reminderLeaseParams);
-
+  public synchronized void addDagAction(DagActionStore.LeaseParams 
reminderLeaseParams) {
+    log.info("Add {} to queue", reminderLeaseParams);

Review Comment:
   nit: `"Adding {} to queue..."` (trailing "..." intended literally)
   
   otherwise: `"About to add {} to queue"`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -230,7 +235,16 @@ protected void 
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
           if (this.dagProcEngineEnabled && 
DagProcUtils.isJobLevelStatus(jobName)) {
             if (updatedJobStatus.getRight() == NewState.FINISHED) {
               // todo - retried/resumed jobs *may* not be handled here, we may 
want to create their dag action elsewhere
-              this.dagManagementStateStore.addJobDagAction(flowGroup, 
flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
+              try {
+                this.dagManagementStateStore.addJobDagAction(flowGroup, 
flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
+              } catch (Exception e) {
+                if (isExceptionInstanceOf(e, nonTransientExceptions)) {
+                  // todo - add metrics
+                  log.error(e.getMessage());

Review Comment:
   `log.warn` is probably more appropriate.  also would be good to preface the 
exception msg w/ context lead-in (especially as there's no stack trace for 
figuring out where in the code did the logging!).  in addition, let's mention 
that the error is being swallowed/ignored.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -104,7 +106,8 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
   private final StateStore<org.apache.gobblin.configuration.State> stateStore;
   private final ScheduledExecutorService scheduledExecutorService;
   private static final Config RETRYER_FALLBACK_CONFIG = 
ConfigFactory.parseMap(ImmutableMap.of(
-      RETRY_TIME_OUT_MS, TimeUnit.HOURS.toMillis(24L), // after a day, presume 
non-transient and give up
+      // keeping the retry timeout less until we configure retryer to retry 
only the transient exceptions
+      RETRY_TIME_OUT_MS, TimeUnit.MINUTES.toMillis(30L), // after 30 minutes, 
presume non-transient and give up

Review Comment:
   I'm not convinced that 24 hours is the wrong length, esp. as you'll now skip 
the `SQLIntegrityConstraintViolationException`s.  but if you really think it 
needs shortening, I wouldn't go so far as only 30 mins.  maybe 6 or 12 hours at 
least...



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -120,6 +123,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
   private final GaaSJobObservabilityEventProducer eventProducer;
   private final DagManagementStateStore dagManagementStateStore;
   private final boolean dagProcEngineEnabled;
+  private final List<Class<? extends Exception>> nonTransientExceptions = 
Collections.singletonList(SQLIntegrityConstraintViolationException.class);

Review Comment:
   I wouldn't name these "non-transient" exceptions, so much as 
"permitted"/"allowable" exceptions



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java:
##########
@@ -112,7 +112,7 @@ protected void handleDagAction(DagActionStore.DagAction 
dagAction, boolean isSta
         case LAUNCH :
         case REEVALUATE :
         case RESUME:
-          dagManagement.addDagAction(dagAction);
+          dagManagement.addDagAction(new DagActionStore.LeaseParams(dagAction, 
false, System.currentTimeMillis()));

Review Comment:
   nit: suggest a `DagActionStore.LeaseParams` ctor that takes only a 
`DagAction` and defaults to `false` and `System.currentTimeMillis()`



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java:
##########
@@ -70,17 +70,6 @@ public static DagActionStore 
getTestDagActionStore(ITestMetastoreDatabase testDb
     return new MysqlDagActionStore(config);
   }
 
-  @Test
-  public void testAddAction() throws Exception {
-    this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName, 
flowExecutionId, jobName, DagActionStore.DagActionType.KILL);
-    //Should not be able to add KILL again when previous one exist
-    Assert.expectThrows(IOException.class,
-        () -> this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName, 
flowExecutionId, jobName, DagActionStore.DagActionType.KILL));
-    //Should be able to add a RESUME action for same execution as well as KILL 
for another execution of the flow
-    this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName, 
flowExecutionId, jobName, DagActionStore.DagActionType.RESUME);
-    this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName, 
flowExecutionId_2, jobName, DagActionStore.DagActionType.KILL);
-  }
-

Review Comment:
   why get rid of this?  I didn't recall a code change to motivate abandoning 
this...



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -409,4 +424,12 @@ public static long getExecutionIdFromTableName(String 
tableName) {
 
   protected abstract org.apache.gobblin.configuration.State 
parseJobStatus(GobblinTrackingEvent event);
 
+  public static boolean isExceptionInstanceOf(Exception exception, 
List<Class<? extends Exception>> typesList) {
+    for (Class<? extends Exception> type : typesList) {
+      if (type.isInstance(exception)) {
+        return true;
+      }
+    }
+    return false;
+  }

Review Comment:
   ```
   return typesList.stream().anyMatch(t -> t.isInstance(exception))
   ```
   ?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -331,6 +345,7 @@ static Pair<org.apache.gobblin.configuration.State, 
NewState> recalcJobStatus(or
 
   private static NewState newState(org.apache.gobblin.configuration.State 
jobStatus, List<org.apache.gobblin.configuration.State> states) {
     if (isNewStateTransitionToFinal(jobStatus, states)) {
+      log.info("Flow ");

Review Comment:
   snuck in?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to