This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2ea96226ce [GOBBLIN-2119] ignore adding deadline dag actions if they
are already present (#4008)
2ea96226ce is described below
commit 2ea96226cedbc8286952029101d8acb82cecadc7
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Tue Jul 23 13:42:39 2024 -0700
[GOBBLIN-2119] ignore adding deadline dag actions if they are already
present (#4008)
* ignore adding deadline dag actions if they are already present
* fix bug
---
.../MysqlMultiActiveLeaseArbiter.java | 2 +-
.../modules/orchestration/proc/DagProcUtils.java | 26 +++++++++++++++++++---
.../orchestration/proc/ReevaluateDagProc.java | 11 +++++----
.../service/monitoring/KafkaJobStatusMonitor.java | 6 ++---
.../orchestration/proc/LaunchDagProcTest.java | 9 +++++---
5 files changed, 38 insertions(+), 16 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 1a6032e443..0aeca9a063 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -260,7 +260,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
leaseParams);
int numRowsUpdated = attemptLeaseIfNewRow(leaseParams.getDagAction(),
ExponentialBackoff.builder().maxRetries(MAX_RETRIES)
- .initialDelay(MIN_INITIAL_DELAY_MILLIS + (long) Math.random()
* DELAY_FOR_RETRY_RANGE_MILLIS)
+ .initialDelay(MIN_INITIAL_DELAY_MILLIS + (long) (Math.random()
* DELAY_FOR_RETRY_RANGE_MILLIS))
.build());
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, leaseParams,
Optional.empty(),
adoptConsensusFlowExecutionId);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 1f4c504ed9..3e0f91c393 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
+import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -193,15 +194,34 @@ public class DagProcUtils {
private static void
sendEnforceJobStartDeadlineDagAction(DagManagementStateStore
dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode)
throws IOException {
- dagManagementStateStore.addJobDagAction(dagNode.getValue().getFlowGroup(),
dagNode.getValue().getFlowName(),
- dagNode.getValue().getFlowExecutionId(),
dagNode.getValue().getJobName(),
+ DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(dagNode.getValue().getFlowGroup(),
+ dagNode.getValue().getFlowName(),
dagNode.getValue().getFlowExecutionId(), dagNode.getValue().getJobName(),
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
+ try {
+ dagManagementStateStore.addDagAction(dagAction);
+ } catch (IOException e) {
+ if (e.getCause() != null && e.getCause() instanceof
SQLIntegrityConstraintViolationException) {
+ // delete old dag action and have a new deadline dag proc with the new
deadline time
+ dagManagementStateStore.deleteDagAction(dagAction);
+ log.warn("Duplicate ENFORCE_JOB_START_DEADLINE Dag Action is being
created. Ignoring... " + e.getMessage());
+ dagManagementStateStore.addDagAction(dagAction);
+ }
+ }
}
public static void
sendEnforceFlowFinishDeadlineDagAction(DagManagementStateStore
dagManagementStateStore, DagActionStore.DagAction launchDagAction)
throws IOException {
- dagManagementStateStore.addFlowDagAction(launchDagAction.getFlowGroup(),
launchDagAction.getFlowName(),
+ DagActionStore.DagAction dagAction =
DagActionStore.DagAction.forFlow(launchDagAction.getFlowGroup(),
launchDagAction.getFlowName(),
launchDagAction.getFlowExecutionId(),
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
+ try {
+ dagManagementStateStore.addDagAction(dagAction);
+ } catch (IOException e) {
+ if (e.getCause() != null && e.getCause() instanceof
SQLIntegrityConstraintViolationException) {
+ dagManagementStateStore.deleteDagAction(dagAction);
+ log.warn("Duplicate ENFORCE_FLOW_FINISH_DEADLINE Dag Action is being
created. Ignoring... " + e.getMessage());
+ dagManagementStateStore.addDagAction(dagAction);
+ }
+ }
}
public static long getDefaultJobStartDeadline(Config config) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index f4e02b8475..8c7a8c8abd 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -82,12 +82,11 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
updateDagNodeStatus(dagManagementStateStore, dagNode, executionStatus);
if
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
- log.warn("Job status for dagNode {} is {}. Re-evaluate dag action should
have been created only for finished status - {}",
- dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES);
- // this may happen if adding job status in the store failed after adding
a ReevaluateDagAction in KafkaJobStatusMonitor
- throw new RuntimeException(String.format("Job status for dagNode %s is
%s. Re-evaluate dag action are created for"
- + " new jobs with no job status when there are multiple of them
to run next; or when a job finishes with status - %s",
- dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES));
+ // this may happen if adding job status in the store failed/delayed
after adding a ReevaluateDagAction in KafkaJobStatusMonitor
+ throw new RuntimeException(String.format("Job status for dagNode %s is
%s. Re-evaluate dag action should have been "
+ + "created only for finished status - %s. This may happen if
reevaluate dag action launched reevaluate dag "
+ + "proc before job status is updated in the store in
KafkaJobStatusMonitor", dagNodeId, executionStatus,
+ FlowStatusGenerator.FINISHED_STATUSES));
}
// get the dag after updating dag node status
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 9a9aac1c60..d5692be371 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -235,8 +235,8 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
if (updatedJobStatus.getRight() == NewState.FINISHED) {
try {
this.dagManagementStateStore.addJobDagAction(flowGroup,
flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
- } catch (Exception e) {
- if (isExceptionInstanceOf(e, nonRetryableExceptions)) {
+ } catch (IOException e) {
+ if (e.getCause() != null &&
isThrowableInstanceOf(e.getCause(), nonRetryableExceptions)) {
// todo - add metrics
log.warn("Duplicate REEVALUATE Dag Action is being created.
Ignoring... " + e.getMessage());
} else {
@@ -426,7 +426,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
protected abstract org.apache.gobblin.configuration.State
parseJobStatus(GobblinTrackingEvent event);
- public static boolean isExceptionInstanceOf(Exception exception,
List<Class<? extends Exception>> typesList) {
+ public static boolean isThrowableInstanceOf(Throwable exception,
List<Class<? extends Exception>> typesList) {
return typesList.stream().anyMatch(e -> e.isInstance(exception));
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 4027a11c7e..5b36851ebf 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -23,7 +23,6 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
-import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.hadoop.fs.Path;
import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
@@ -55,13 +54,17 @@ import
org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.util.ConfigUtils;
-import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -123,7 +126,7 @@ public class LaunchDagProcTest {
.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any()));
Mockito.verify(this.dagManagementStateStore,
Mockito.times(numOfLaunchedJobs))
- .addFlowDagAction(any(), any(), anyLong(),
eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
+ .addJobDagAction(any(), any(), anyLong(),
eq(DagActionStore.NO_JOB_NAME_DEFAULT),
eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
}
@Test