This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new eeb51428c [GOBBLIN-1930] Improve Multi-active related logs and metrics
(#3800)
eeb51428c is described below
commit eeb51428ccbc7231a2831c26888827c043cdf3c0
Author: umustafi <[email protected]>
AuthorDate: Tue Oct 17 16:33:32 2023 -0700
[GOBBLIN-1930] Improve Multi-active related logs and metrics (#3800)
* Improve Multi-active related logs and metrics
* Add more metrics and logs around forwarding dag action to DagManager
* Improve logs in response to review comments
* Replace flow execution id with trigger timestamp from multi-active
* Update flow action execution id within lease arbiter
* Fix test & make Lease Statuses more lean
* Update javadoc
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../apache/gobblin/metrics/ServiceMetricNames.java | 6 ++++
.../apache/gobblin/runtime/api/DagActionStore.java | 9 ++++++
.../runtime/api/MultiActiveLeaseArbiter.java | 27 ++++++++++++-----
.../runtime/api/MysqlMultiActiveLeaseArbiter.java | 35 ++++++++++++----------
.../gobblin/runtime/metrics/RuntimeMetrics.java | 17 ++++++-----
.../api/MysqlMultiActiveLeaseArbiterTest.java | 29 ++++++++++--------
.../service/modules/orchestration/DagManager.java | 19 ++++++++----
.../modules/orchestration/DagManagerMetrics.java | 13 ++++++++
.../modules/orchestration/FlowTriggerHandler.java | 6 ++--
.../modules/orchestration/Orchestrator.java | 11 ++++++-
.../monitoring/DagActionStoreChangeMonitor.java | 24 +++++++++------
.../orchestration/FlowTriggerHandlerTest.java | 4 +--
12 files changed, 139 insertions(+), 61 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 1a544a3df..aff097bbd 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -43,6 +43,12 @@ public class ServiceMetricNames {
public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT =
GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX +
".jobDoesNotExistInScheduler";
public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT
= GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX +
".failedToSetReminderCount";
+ // DagManager Related Metrics
+ public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX +
".dagManager";
+ public static final String
+ DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX +
".failedLaunchEventsOnStartupCount";
+ public static final String FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT =
DAG_MANAGER_PREFIX + ".flowFailedForwardToDagManagerCount";
+
//Job status poll timer
public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX
+ ".jobStatusPoll.time";
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
index 3e11d7c72..eb26acd16 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
@@ -45,6 +45,15 @@ public interface DagActionStore {
public FlowId getFlowId() {
return new
FlowId().setFlowGroup(this.flowGroup).setFlowName(this.flowName);
}
+
+ /**
+ * Replace flow execution id with agreed upon event time to easily track
the flow
+ */
+ public static DagActionStore.DagAction
updateFlowExecutionId(DagActionStore.DagAction flowAction,
+ long eventTimeMillis) {
+ return new DagActionStore.DagAction(flowAction.getFlowGroup(),
flowAction.getFlowName(),
+ String.valueOf(eventTimeMillis), flowAction.getFlowActionType());
+ }
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
index faacb0995..253db49ba 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
@@ -79,28 +79,41 @@ public interface MultiActiveLeaseArbiter {
class NoLongerLeasingStatus extends LeaseAttemptStatus {}
/*
- The participant calling this method acquired the lease for the event in
question. The class contains the
- `eventTimestamp` associated with the lease as well as the time the caller
obtained the lease or
- `leaseAcquisitionTimestamp`.
+ The participant calling this method acquired the lease for the event in
question. `Flow action`'s flow execution id
+ is the timestamp associated with the lease and the time the caller obtained
the lease is stored within the
+ `leaseAcquisitionTimestamp` field.
*/
@Data
class LeaseObtainedStatus extends LeaseAttemptStatus {
private final DagActionStore.DagAction flowAction;
- private final long eventTimestamp;
private final long leaseAcquisitionTimestamp;
+
+ /**
+ * @return event time in millis since epoch for the event of this lease
acquisition
+ */
+ public long getEventTimeMillis() {
+ return Long.parseLong(flowAction.getFlowExecutionId());
+ }
}
/*
This flow action event already has a valid lease owned by another
participant.
- `eventTimeMillis` is the timestamp the lease is associated with, which may
be a different timestamp for the same flow
- action corresponding to the same instance of the event or a distinct one.
+ `Flow action`'s flow execution id is the timestamp the lease is associated
with, however the flow action event it
+ corresponds to may be a different and distinct occurrence of the same event.
`minimumLingerDurationMillis` is the minimum amount of time to wait before
this participant should return to check if
the lease has completed or expired
*/
@Data
class LeasedToAnotherStatus extends LeaseAttemptStatus {
private final DagActionStore.DagAction flowAction;
- private final long eventTimeMillis;
private final long minimumLingerDurationMillis;
+
+ /**
+ * Returns event time in millis since epoch for the event whose lease was
obtained by another participant.
+ * @return
+ */
+ public long getEventTimeMillis() {
+ return Long.parseLong(flowAction.getFlowExecutionId());
+ }
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 4c2e8d2da..c6161d936 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -41,6 +41,8 @@ import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
+import static org.apache.gobblin.runtime.api.DagActionStore.DagAction.*;
+
/**
* MySQL based implementation of the {@link MultiActiveLeaseArbiter} which
uses a MySQL store to resolve ownership of
@@ -242,7 +244,6 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
Runnable retentionTask = () -> {
try {
- Thread.sleep(10000);
withPreparedStatement(thisTableRetentionStatement,
retentionStatement -> {
retentionStatement.setLong(1, retentionPeriodMillis);
@@ -253,7 +254,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
}
return numRowsDeleted;
}, true);
- } catch (InterruptedException | IOException e) {
+ } catch (IOException e) {
log.error("Failing to run retention on lease arbiter table. Unbounded
growth can lead to database slowness and "
+ "affect our system performance. Examine exception: ", e);
}
@@ -307,7 +308,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
}
if (eventTimeMillis == dbEventTimestamp.getTime()) {
// TODO: change this to a debug after fixing issue
- log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
dbEventTimeMillis: {} - Reminder event time"
+ log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
dbEventTimeMillis: {} - Reminder event time "
+ "is the same as db event.", flowAction, isReminderEvent ?
"reminder" : "original",
eventTimeMillis, dbEventTimestamp);
}
@@ -320,16 +321,18 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
// Lease is valid
if (leaseValidityStatus == 1) {
if (isWithinEpsilon) {
+ DagActionStore.DagAction updatedFlowAction =
updateFlowExecutionId(flowAction, dbEventTimestamp.getTime());
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
CASE 2: Same event, lease is valid",
- flowAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
+ updatedFlowAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
- return new LeasedToAnotherStatus(flowAction,
dbEventTimestamp.getTime(),
+ return new LeasedToAnotherStatus(updatedFlowAction,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger -
dbCurrentTimestamp.getTime());
}
+ DagActionStore.DagAction updatedFlowAction =
updateFlowExecutionId(flowAction, dbCurrentTimestamp.getTime());
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE
3: Distinct event, lease is valid",
- flowAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
+ updatedFlowAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
// Utilize db lease acquisition timestamp for wait time
- return new LeasedToAnotherStatus(flowAction,
dbCurrentTimestamp.getTime(),
+ return new LeasedToAnotherStatus(updatedFlowAction,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger -
dbCurrentTimestamp.getTime());
} // Lease is invalid
else if (leaseValidityStatus == 2) {
@@ -515,16 +518,16 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
return new NoLongerLeasingStatus();
}
+ DagActionStore.DagAction updatedFlowAction =
updateFlowExecutionId(flowAction, selectInfoResult.eventTimeMillis);
if (numRowsUpdated == 1) {
- log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}]
successfully!", flowAction,
+ log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}]
successfully!", updatedFlowAction,
isReminderEvent ? "reminder" : "original",
selectInfoResult.eventTimeMillis);
- return new LeaseObtainedStatus(flowAction,
selectInfoResult.eventTimeMillis,
- selectInfoResult.getLeaseAcquisitionTimeMillis().get());
+ return new LeaseObtainedStatus(updatedFlowAction,
selectInfoResult.getLeaseAcquisitionTimeMillis().get());
}
log.info("Another participant acquired lease in between for [{}, is: {},
eventTimestamp: {}] - num rows updated: {}",
- flowAction, isReminderEvent ? "reminder" : "original",
selectInfoResult.eventTimeMillis, numRowsUpdated);
+ updatedFlowAction, isReminderEvent ? "reminder" : "original",
selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
- return new LeasedToAnotherStatus(flowAction,
selectInfoResult.getEventTimeMillis(),
+ return new LeasedToAnotherStatus(updatedFlowAction,
selectInfoResult.getLeaseAcquisitionTimeMillis().get() +
selectInfoResult.getDbLinger()
- (dbCurrentTimestamp.isPresent() ?
dbCurrentTimestamp.get().getTime() : System.currentTimeMillis()));
}
@@ -599,22 +602,22 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
updateStatement.setString(++i, flowGroup);
updateStatement.setString(++i, flowName);
updateStatement.setString(++i, flowActionType.toString());
- updateStatement.setTimestamp(++i, new
Timestamp(status.getEventTimestamp()), UTC_CAL.get());
+ updateStatement.setTimestamp(++i, new
Timestamp(status.getEventTimeMillis()), UTC_CAL.get());
updateStatement.setTimestamp(++i, new
Timestamp(status.getLeaseAcquisitionTimestamp()), UTC_CAL.get());
int numRowsUpdated = updateStatement.executeUpdate();
if (numRowsUpdated == 0) {
log.info("Multi-active lease arbiter lease attempt: [{},
eventTimestamp: {}] - FAILED to complete because "
+ "lease expired or event cleaned up before host completed
required actions", flowAction,
- status.getEventTimestamp());
+ status.getEventTimeMillis());
return false;
}
if( numRowsUpdated == 1) {
log.info("Multi-active lease arbiter lease attempt: [{},
eventTimestamp: {}] - COMPLETED, no longer leasing"
- + " this event after this.", flowAction,
status.getEventTimestamp());
+ + " this event after this.", flowAction,
status.getEventTimeMillis());
return true;
};
throw new IOException(String.format("Attempt to complete lease use:
[%s, eventTimestamp: %s] - updated more "
- + "rows than expected", flowAction,
status.getEventTimestamp()));
+ + "rows than expected", flowAction,
status.getEventTimeMillis()));
}, true);
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index 33734cfcb..8fc1258ab 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -43,14 +43,17 @@ public class RuntimeMetrics {
public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED=
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".specStoreMonitor.message.processed";
public static final String
GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".specstoreMonitor.produce.to.consume.delay";
- public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.kills.invoked";
- public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED=
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.message.processed";
- public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED
= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.resumes.invoked";
- public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.flows.launched";
- public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.unexpected.errors";
- public static final String
- GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.produce.to.consume.delay";
+ public static final String DAG_ACTION_STORE_MONITOR_PREFIX =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor";
+ public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED =
DAG_ACTION_STORE_MONITOR_PREFIX + ".kills.invoked";
+ public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED =
DAG_ACTION_STORE_MONITOR_PREFIX + ".message.processed";
+ public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT =
DAG_ACTION_STORE_MONITOR_PREFIX + ".messagesFilteredOut";
+ public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED
= DAG_ACTION_STORE_MONITOR_PREFIX + ".resumes.invoked";
+ public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED =
DAG_ACTION_STORE_MONITOR_PREFIX + ".flows.launched";
+ public static final String
GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS =
DAG_ACTION_STORE_MONITOR_PREFIX + ".failedFlowLaunchSubmissions";
+ public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS =
DAG_ACTION_STORE_MONITOR_PREFIX + ".unexpected.errors";
+ public static final String
+ GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
DAG_ACTION_STORE_MONITOR_PREFIX + ".produce.to.consume.delay";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.mysql.quota.manager.unexpected.errors";
public static final String
GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.mysql.quota.manager.quotaRequests.exceeded";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.mysql.quota.manager.time.to.check.quota";
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
index 15090e8f1..7bafc78ff 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
@@ -88,10 +88,11 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(firstLaunchStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus firstObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) firstLaunchStatus;
- Assert.assertTrue(firstObtainedStatus.getEventTimestamp() <=
+ Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <=
firstObtainedStatus.getLeaseAcquisitionTimestamp());
Assert.assertTrue(firstObtainedStatus.getFlowAction().equals(
- new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.LAUNCH)));
+ new DagActionStore.DagAction(flowGroup, flowName,
String.valueOf(firstObtainedStatus.getEventTimeMillis()),
+ DagActionStore.FlowActionType.LAUNCH)));
// Verify that different DagAction types for the same flow can have leases
at the same time
DagActionStore.DagAction killDagAction = new
@@ -102,7 +103,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
MultiActiveLeaseArbiter.LeaseObtainedStatus killObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) killStatus;
Assert.assertTrue(
- killObtainedStatus.getLeaseAcquisitionTimestamp() >=
killObtainedStatus.getEventTimestamp());
+ killObtainedStatus.getLeaseAcquisitionTimestamp() >=
killObtainedStatus.getEventTimeMillis());
// Tests CASE 2 of acquire lease for a flow action event that already has
a valid lease for the same event in db
// Very little time should have passed if this test directly follows the
one above so this call will be considered
@@ -112,7 +113,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(secondLaunchStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
MultiActiveLeaseArbiter.LeasedToAnotherStatus secondLeasedToAnotherStatus =
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) secondLaunchStatus;
- Assert.assertEquals(firstObtainedStatus.getEventTimestamp(),
secondLeasedToAnotherStatus.getEventTimeMillis());
+ Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(),
secondLeasedToAnotherStatus.getEventTimeMillis());
Assert.assertTrue(secondLeasedToAnotherStatus.getMinimumLingerDurationMillis()
> 0);
// Tests CASE 3 of trying to acquire a lease for a distinct flow action
event, while the previous event's lease is
@@ -124,7 +125,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(thirdLaunchStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
MultiActiveLeaseArbiter.LeasedToAnotherStatus thirdLeasedToAnotherStatus =
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) thirdLaunchStatus;
- Assert.assertTrue(thirdLeasedToAnotherStatus.getEventTimeMillis() >
firstObtainedStatus.getEventTimestamp());
+ Assert.assertTrue(thirdLeasedToAnotherStatus.getEventTimeMillis() >
firstObtainedStatus.getEventTimeMillis());
Assert.assertTrue(thirdLeasedToAnotherStatus.getMinimumLingerDurationMillis() <
LINGER);
// Tests CASE 4 of lease out of date
@@ -134,14 +135,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(fourthLaunchStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus fourthObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) fourthLaunchStatus;
- Assert.assertTrue(fourthObtainedStatus.getEventTimestamp() >
eventTimeMillis + LINGER);
- Assert.assertTrue(fourthObtainedStatus.getEventTimestamp()
+ Assert.assertTrue(fourthObtainedStatus.getEventTimeMillis() >
eventTimeMillis + LINGER);
+ Assert.assertTrue(fourthObtainedStatus.getEventTimeMillis()
<= fourthObtainedStatus.getLeaseAcquisitionTimestamp());
// Tests CASE 5 of no longer leasing the same event in DB
// done immediately after previous lease obtainment so should be marked as
the same event
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus));
- Assert.assertTrue(System.currentTimeMillis() -
fourthObtainedStatus.getEventTimestamp() < EPSILON);
+ Assert.assertTrue(System.currentTimeMillis() -
fourthObtainedStatus.getEventTimeMillis() < EPSILON);
MultiActiveLeaseArbiter.LeaseAttemptStatus fifthLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false);
Assert.assertTrue(fifthLaunchStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus);
@@ -154,7 +155,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(sixthLaunchStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus sixthObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) sixthLaunchStatus;
- Assert.assertTrue(sixthObtainedStatus.getEventTimestamp()
+ Assert.assertTrue(sixthObtainedStatus.getEventTimeMillis()
<= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
}
@@ -216,8 +217,10 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Mark the resume action lease from above as completed by fabricating a
LeaseObtainedStatus
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+ DagActionStore.DagAction updatedResumeDagAction =
DagActionStore.DagAction.updateFlowExecutionId(resumeDagAction,
+ selectInfoResult.getEventTimeMillis());
boolean markedSuccess =
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
- resumeDagAction, selectInfoResult.getEventTimeMillis(),
selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
+ updatedResumeDagAction,
selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Assert.assertTrue(markedSuccess);
// Ensure no NPE results from calling this after a lease has been
completed and acquisition timestamp val is NULL
mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1,
resumeDagAction,
@@ -281,7 +284,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
selectInfoResult.getEventTimeMillis(), true);
Assert.assertTrue(attemptStatus instanceof LeaseObtainedStatus);
LeaseObtainedStatus obtainedStatus = (LeaseObtainedStatus) attemptStatus;
- Assert.assertTrue(obtainedStatus.getEventTimestamp() >
selectInfoResult.getEventTimeMillis());
+ Assert.assertTrue(obtainedStatus.getEventTimeMillis() >
selectInfoResult.getEventTimeMillis());
Assert.assertTrue(obtainedStatus.getLeaseAcquisitionTimestamp() >
selectInfoResult.getLeaseAcquisitionTimeMillis().get().longValue());
}
@@ -296,8 +299,10 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Mark the resume action lease from above as completed by fabricating a
LeaseObtainedStatus
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+ DagActionStore.DagAction updatedResumeDagAction =
DagActionStore.DagAction.updateFlowExecutionId(resumeDagAction,
+ selectInfoResult.getEventTimeMillis());
boolean markedSuccess =
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
- resumeDagAction, selectInfoResult.getEventTimeMillis(),
selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
+ updatedResumeDagAction,
selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Assert.assertTrue(markedSuccess);
// Sleep enough time for the event to have been considered distinct
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index c8d6bf598..b4ec9c0ce 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -306,6 +306,8 @@ public class DagManager extends AbstractIdleService {
* Note this should only be called from the {@link Orchestrator} or {@link
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor}
*/
public synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist,
boolean setStatus) throws IOException {
+ // TODO: Used to track missing dag issue, remove later as needed
+ log.info("Add dag (persist: {}, setStatus: {}): {}", persist, setStatus,
dag);
if (!isActive) {
log.warn("Skipping add dag because this instance of DagManager is not
active for dag: {}", dag);
return;
@@ -509,14 +511,19 @@ public class DagManager extends AbstractIdleService {
// Upon handling the action, delete it so on leadership change this is
not duplicated
this.dagActionStore.get().deleteDagAction(launchAction);
} catch (URISyntaxException e) {
- log.warn("Could not create URI object for flowId {} due to exception
{}", flowId, e.getMessage());
+ log.warn(String.format("Could not create URI object for flowId %s due to
exception", flowId), e);
+ this.dagManagerMetrics.incrementFailedLaunchCount();
} catch (SpecNotFoundException e) {
- log.warn("Spec not found for flowId {} due to exception {}", flowId,
e.getMessage());
+ log.warn(String.format("Spec not found for flowId %s due to exception",
flowId), e);
+ this.dagManagerMetrics.incrementFailedLaunchCount();
} catch (IOException e) {
- log.warn("Failed to add Job Execution Plan for flowId {} OR delete dag
action from dagActionStore (check "
- + "stacktrace) due to exception {}", flowId, e.getMessage());
+ log.warn(String.format("Failed to add Job Execution Plan for flowId %s
OR delete dag action from dagActionStore "
+ + "(check stacktrace) due to exception", flowId), e);
+ this.dagManagerMetrics.incrementFailedLaunchCount();
} catch (InterruptedException e) {
- log.warn("SpecCompiler failed to reach healthy state before compilation
of flowId {}. Exception: ", flowId, e);
+ log.warn(String.format("SpecCompiler failed to reach healthy state
before compilation of flowId %s due to "
+ + "exception", flowId), e);
+ this.dagManagerMetrics.incrementFailedLaunchCount();
}
}
@@ -620,6 +627,8 @@ public class DagManager extends AbstractIdleService {
}
//Initialize dag.
initialize(dag);
+ } else {
+ log.warn("Null dag despite non-empty queue; ignoring the dag");
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
index a5f34cff7..6d6c545b5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
@@ -75,6 +75,9 @@ public class DagManagerMetrics {
private final Map<String, ContextAwareMeter> executorStartSlaExceededMeters
= Maps.newConcurrentMap();
private final Map<String, ContextAwareMeter> executorSlaExceededMeters =
Maps.newConcurrentMap();
private final Map<String, ContextAwareMeter> executorJobSentMeters =
Maps.newConcurrentMap();
+
+ // Metrics for unexpected flow handling failures
+ private ContextAwareCounter failedLaunchEventsOnActivationCount;
MetricContext metricContext;
public DagManagerMetrics(MetricContext metricContext) {
@@ -100,6 +103,9 @@ public class DagManagerMetrics {
ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER));
allRunningMeter =
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR));
+ failedLaunchEventsOnActivationCount = metricContext.contextAwareCounter(
+ MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+
ServiceMetricNames.DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT));
}
}
@@ -199,6 +205,13 @@ public class DagManagerMetrics {
}
}
+ // Increment the count for num of failed launches during leader activation
+ public void incrementFailedLaunchCount() {
+ if (this.metricContext != null) {
+ this.failedLaunchEventsOnActivationCount.inc();
+ }
+ }
+
private List<ContextAwareCounter>
getRunningJobsCounterForUser(Dag.DagNode<JobExecutionPlan> dagNode) {
Config configs = dagNode.getValue().getJobSpec().getConfig();
String proxy = ConfigUtils.getString(configs,
AzkabanProjectConfig.USER_TO_PROXY, null);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index 6d9dcc9d6..8abaa209c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -117,13 +117,15 @@ public class FlowTriggerHandler {
this.leaseObtainedCount.inc();
if (persistFlowAction(leaseObtainedStatus)) {
log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ",
leaseObtainedStatus.getFlowAction(),
- leaseObtainedStatus.getEventTimestamp());
+ leaseObtainedStatus.getEventTimeMillis());
return;
}
// If persisting the flow action failed, then we set another trigger
for this event to occur immediately to
// re-attempt handling the event
+ DagActionStore.DagAction updatedFlowAction =
DagActionStore.DagAction.updateFlowExecutionId(flowAction,
+ leaseObtainedStatus.getEventTimeMillis());
scheduleReminderForEvent(jobProps,
- new MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction,
leaseObtainedStatus.getEventTimestamp(), 0L),
+ new
MultiActiveLeaseArbiter.LeasedToAnotherStatus(updatedFlowAction, 0L),
eventTimeMillis);
return;
} else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 7afd8bba4..bcb174320 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.service.modules.orchestration;
+import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
@@ -93,6 +94,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
private Optional<Meter> flowOrchestrationFailedMeter;
@Getter
private Optional<Timer> flowOrchestrationTimer;
+ private Optional<Counter> flowFailedForwardToDagManagerCounter;
@Setter
private FlowStatusGenerator flowStatusGenerator;
@@ -137,12 +139,14 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
this.flowOrchestrationSuccessFulMeter =
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER));
this.flowOrchestrationFailedMeter =
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER));
this.flowOrchestrationTimer =
Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER));
+ this.flowFailedForwardToDagManagerCounter =
Optional.of(this.metricContext.counter(ServiceMetricNames.FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT));
this.eventSubmitter = Optional.of(new
EventSubmitter.Builder(this.metricContext,
"org.apache.gobblin.service").build());
} else {
this.metricContext = null;
this.flowOrchestrationSuccessFulMeter = Optional.absent();
this.flowOrchestrationFailedMeter = Optional.absent();
this.flowOrchestrationTimer = Optional.absent();
+ this.flowFailedForwardToDagManagerCounter = Optional.absent();
this.eventSubmitter = Optional.absent();
}
this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
@@ -337,6 +341,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
if (optionalJobExecutionPlanDag.isPresent()) {
submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
} else {
+ _log.warn("Flow: {} submitted to dagManager failed to compile and
produce a job execution plan dag", flowSpec);
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
}
}
@@ -347,9 +352,13 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
//Send the dag to the DagManager.
this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
} catch (Exception ex) {
+ String failureMessage = "Failed to add Job Execution Plan due to: " +
ex.getMessage();
+ _log.warn("Orchestrator call - " + failureMessage, ex);
+ if (this.flowFailedForwardToDagManagerCounter.isPresent()) {
+ this.flowFailedForwardToDagManagerCounter.get().inc();
+ }
if (this.eventSubmitter.isPresent()) {
// pronounce failed before stack unwinds, to ensure flow not marooned
in `COMPILED` state; (failure likely attributable to DB connection/failover)
- String failureMessage = "Failed to add Job Execution Plan due to: " +
ex.getMessage();
Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
flowMetadata.put(TimingEvent.METADATA_MESSAGE, failureMessage);
new TimingEvent(this.eventSubmitter.get(),
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 870b68f53..1435e076a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -58,8 +58,10 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
private ContextAwareMeter killsInvoked;
private ContextAwareMeter resumesInvoked;
private ContextAwareMeter flowsLaunched;
+ private ContextAwareMeter failedFlowLaunchSubmissions;
private ContextAwareMeter unexpectedErrors;
private ContextAwareMeter messageProcessedMeter;
+ private ContextAwareMeter messageFilteredOutMeter;
private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from
all partitions in one gauge
private volatile Long produceToConsumeDelayValue = -1L;
@@ -130,19 +132,23 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
String changeIdentifier = tid + key;
if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier,
dagActionsSeenCache, operation,
produceTimestamp.toString())) {
+ this.messageFilteredOutMeter.mark();
return;
}
+ // Used to easily log information to identify the dag action
+ DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
+ dagActionType);
+
// We only expect INSERT and DELETE operations done to this table. INSERTs
correspond to any type of
// {@link DagActionStore.FlowActionType} flow requests that have to be
processed. DELETEs require no action.
try {
if (operation.equals("INSERT")) {
+ log.info("DagAction change ({}) received for flow: {}", dagActionType,
dagAction);
if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) {
- log.info("Received insert dag action and about to send resume flow
request");
dagManager.handleResumeFlowRequest(flowGroup,
flowName,Long.parseLong(flowExecutionId));
this.resumesInvoked.mark();
} else if (dagActionType.equals(DagActionStore.FlowActionType.KILL)) {
- log.info("Received insert dag action and about to send kill flow
request");
dagManager.handleKillFlowRequest(flowGroup, flowName,
Long.parseLong(flowExecutionId));
this.killsInvoked.mark();
} else if (dagActionType.equals(DagActionStore.FlowActionType.LAUNCH))
{
@@ -150,10 +156,8 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
if (!this.isMultiActiveSchedulerEnabled) {
this.unexpectedErrors.mark();
throw new RuntimeException(String.format("Received LAUNCH
dagAction while not in multi-active scheduler "
- + "mode for flowAction: %s",
- new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, dagActionType)));
+ + "mode for flowAction: %s", dagAction));
}
- log.info("Received insert dag action and about to forward launch
request to DagManager");
submitFlowToDagManagerHelper(flowGroup, flowName);
} else {
log.warn("Received unsupported dagAction {}. Expected to be a KILL,
RESUME, or LAUNCH", dagActionType);
@@ -191,19 +195,19 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
this.orchestrator.submitFlowToDagManager(spec);
} catch (URISyntaxException e) {
log.warn("Could not create URI object for flowId {}. Exception {}",
flowId, e.getMessage());
- this.unexpectedErrors.mark();
+ this.failedFlowLaunchSubmissions.mark();
return;
} catch (SpecNotFoundException e) {
log.warn("Spec not found for flowId {} due to exception {}", flowId,
e.getMessage());
- this.unexpectedErrors.mark();
+ this.failedFlowLaunchSubmissions.mark();
return;
} catch (IOException e) {
log.warn("Failed to add Job Execution Plan for flowId {} due to
exception {}", flowId, e.getMessage());
- this.unexpectedErrors.mark();
+ this.failedFlowLaunchSubmissions.mark();
return;
} catch (InterruptedException e) {
log.warn("SpecCompiler failed to reach healthy state before compilation
of flowId {}. Exception: ", flowId, e);
- this.unexpectedErrors.mark();
+ this.failedFlowLaunchSubmissions.mark();
return;
}
// Only mark this if the dag was successfully added
@@ -216,8 +220,10 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
this.killsInvoked =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED);
this.resumesInvoked =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
this.flowsLaunched =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED);
+ this.failedFlowLaunchSubmissions =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS);
this.unexpectedErrors =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
this.messageProcessedMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
+ this.messageFilteredOutMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT);
this.produceToConsumeDelayMillis =
this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS,
() -> produceToConsumeDelayValue);
this.getMetricContext().register(this.produceToConsumeDelayMillis);
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java
index d580704d2..672892673 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandlerTest.java
@@ -34,9 +34,9 @@ public class FlowTriggerHandlerTest {
String cronExpressionSuffix =
truncateFirstTwoFieldsOfCronExpression(cronExpression);
int schedulerBackOffMillis = 10;
DagActionStore.DagAction flowAction = new
DagActionStore.DagAction("flowName", "flowGroup",
- "999999", DagActionStore.FlowActionType.LAUNCH);
+ String.valueOf(eventToRevisit), DagActionStore.FlowActionType.LAUNCH);
MultiActiveLeaseArbiter.LeasedToAnotherStatus leasedToAnotherStatus =
- new MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction,
eventToRevisit, minimumLingerDurationMillis);
+ new MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction,
minimumLingerDurationMillis);
/**
* Remove first two fields from cron expression representing seconds and
minutes to return truncated cron expression