This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ea96226ce [GOBBLIN-2119] ignore adding deadline dag actions if they 
are already present (#4008)
2ea96226ce is described below

commit 2ea96226cedbc8286952029101d8acb82cecadc7
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Tue Jul 23 13:42:39 2024 -0700

    [GOBBLIN-2119] ignore adding deadline dag actions if they are already 
present (#4008)
    
    * ignore adding deadline dag actions if they are already present
    * fix bug
---
 .../MysqlMultiActiveLeaseArbiter.java              |  2 +-
 .../modules/orchestration/proc/DagProcUtils.java   | 26 +++++++++++++++++++---
 .../orchestration/proc/ReevaluateDagProc.java      | 11 +++++----
 .../service/monitoring/KafkaJobStatusMonitor.java  |  6 ++---
 .../orchestration/proc/LaunchDagProcTest.java      |  9 +++++---
 5 files changed, 38 insertions(+), 16 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 1a6032e443..0aeca9a063 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -260,7 +260,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
             leaseParams);
         int numRowsUpdated = attemptLeaseIfNewRow(leaseParams.getDagAction(),
             ExponentialBackoff.builder().maxRetries(MAX_RETRIES)
-                .initialDelay(MIN_INITIAL_DELAY_MILLIS + (long) Math.random() 
* DELAY_FOR_RETRY_RANGE_MILLIS)
+                .initialDelay(MIN_INITIAL_DELAY_MILLIS + (long) (Math.random() 
* DELAY_FOR_RETRY_RANGE_MILLIS))
                 .build());
        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, leaseParams, 
Optional.empty(),
            adoptConsensusFlowExecutionId);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 1f4c504ed9..3e0f91c393 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
+import java.sql.SQLIntegrityConstraintViolationException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -193,15 +194,34 @@ public class DagProcUtils {
 
   private static void 
sendEnforceJobStartDeadlineDagAction(DagManagementStateStore 
dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode)
       throws IOException {
-    dagManagementStateStore.addJobDagAction(dagNode.getValue().getFlowGroup(), 
dagNode.getValue().getFlowName(),
-        dagNode.getValue().getFlowExecutionId(), 
dagNode.getValue().getJobName(),
+    DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(dagNode.getValue().getFlowGroup(),
+        dagNode.getValue().getFlowName(), 
dagNode.getValue().getFlowExecutionId(), dagNode.getValue().getJobName(),
         DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
+    try {
+      dagManagementStateStore.addDagAction(dagAction);
+    } catch (IOException e) {
+      if (e.getCause() != null && e.getCause() instanceof 
SQLIntegrityConstraintViolationException) {
+        // delete old dag action and have a new deadline dag proc with the new 
deadline time
+        dagManagementStateStore.deleteDagAction(dagAction);
+        log.warn("Duplicate ENFORCE_JOB_START_DEADLINE Dag Action is being 
created. Ignoring... " + e.getMessage());
+        dagManagementStateStore.addDagAction(dagAction);
+      }
+    }
   }
 
   public static void 
sendEnforceFlowFinishDeadlineDagAction(DagManagementStateStore 
dagManagementStateStore, DagActionStore.DagAction launchDagAction)
       throws IOException {
-    dagManagementStateStore.addFlowDagAction(launchDagAction.getFlowGroup(), 
launchDagAction.getFlowName(),
+    DagActionStore.DagAction dagAction = 
DagActionStore.DagAction.forFlow(launchDagAction.getFlowGroup(), 
launchDagAction.getFlowName(),
         launchDagAction.getFlowExecutionId(), 
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
+    try {
+      dagManagementStateStore.addDagAction(dagAction);
+    } catch (IOException e) {
+      if (e.getCause() != null && e.getCause() instanceof 
SQLIntegrityConstraintViolationException) {
+        dagManagementStateStore.deleteDagAction(dagAction);
+        log.warn("Duplicate ENFORCE_FLOW_FINISH_DEADLINE Dag Action is being 
created. Ignoring... " + e.getMessage());
+        dagManagementStateStore.addDagAction(dagAction);
+      }
+    }
   }
 
   public static long getDefaultJobStartDeadline(Config config) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index f4e02b8475..8c7a8c8abd 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -82,12 +82,11 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
     updateDagNodeStatus(dagManagementStateStore, dagNode, executionStatus);
 
     if 
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
-      log.warn("Job status for dagNode {} is {}. Re-evaluate dag action should 
have been created only for finished status - {}",
-          dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES);
-      // this may happen if adding job status in the store failed after adding 
a ReevaluateDagAction in KafkaJobStatusMonitor
-      throw new RuntimeException(String.format("Job status for dagNode %s is 
%s. Re-evaluate dag action are created for"
-              + " new jobs with no job status when there are multiple of them 
to run next; or when a job finishes with status - %s",
-          dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES));
+      // this may happen if adding job status in the store failed/delayed 
after adding a ReevaluateDagAction in KafkaJobStatusMonitor
+      throw new RuntimeException(String.format("Job status for dagNode %s is 
%s. Re-evaluate dag action should have been "
+              + "created only for finished status - %s. This may happen if 
reevaluate dag action launched reevaluate dag "
+              + "proc before job status is updated in the store in 
KafkaJobStatusMonitor", dagNodeId, executionStatus,
+          FlowStatusGenerator.FINISHED_STATUSES));
     }
 
     // get the dag after updating dag node status
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 9a9aac1c60..d5692be371 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -235,8 +235,8 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
             if (updatedJobStatus.getRight() == NewState.FINISHED) {
               try {
                 this.dagManagementStateStore.addJobDagAction(flowGroup, 
flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
-              } catch (Exception e) {
-                if (isExceptionInstanceOf(e, nonRetryableExceptions)) {
+              } catch (IOException e) {
+                if (e.getCause() != null && 
isThrowableInstanceOf(e.getCause(), nonRetryableExceptions)) {
                   // todo - add metrics
                   log.warn("Duplicate REEVALUATE Dag Action is being created. 
Ignoring... " + e.getMessage());
                 } else {
@@ -426,7 +426,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
 
   protected abstract org.apache.gobblin.configuration.State 
parseJobStatus(GobblinTrackingEvent event);
 
-  public static boolean isExceptionInstanceOf(Exception exception, 
List<Class<? extends Exception>> typesList) {
+  public static boolean isThrowableInstanceOf(Throwable exception, 
List<Class<? extends Exception>> typesList) {
     return typesList.stream().anyMatch(e -> e.isInstance(exception));
   }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 4027a11c7e..5b36851ebf 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -23,7 +23,6 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 
-import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.hadoop.fs.Path;
 import org.mockito.Mockito;
 import org.testng.annotations.AfterClass;
@@ -55,13 +54,17 @@ import 
org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
 import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
 import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
 import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 import org.apache.gobblin.util.ConfigUtils;
 
-import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -123,7 +126,7 @@ public class LaunchDagProcTest {
         .forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any()));
 
     Mockito.verify(this.dagManagementStateStore, 
Mockito.times(numOfLaunchedJobs))
-        .addFlowDagAction(any(), any(), anyLong(), 
eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
+        .addJobDagAction(any(), any(), anyLong(), 
eq(DagActionStore.NO_JOB_NAME_DEFAULT), 
eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
   }
 
   @Test

Reply via email to