[
https://issues.apache.org/jira/browse/GOBBLIN-2073?focusedWorklogId=921347&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-921347
]
ASF GitHub Bot logged work on GOBBLIN-2073:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 30/May/24 08:32
Start Date: 30/May/24 08:32
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3952:
URL: https://github.com/apache/gobblin/pull/3952#discussion_r1620079757
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java:
##########
@@ -35,9 +35,9 @@ public class FlowLaunchHandlerTest {
String cronExpressionSuffix =
truncateFirstTwoFieldsOfCronExpression(cronExpression);
int schedulerBackOffMillis = 10;
DagActionStore.DagAction dagAction = new
DagActionStore.DagAction("flowName", "flowGroup",
- String.valueOf(flowExecutionId), "jobName",
DagActionStore.DagActionType.LAUNCH);
+ String.valueOf(flowExecutionId), "jobName",
DagActionStore.DagActionType.LAUNCH, eventToRevisit);
LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus =
- new LeaseAttemptStatus.LeasedToAnotherStatus(dagAction, eventToRevisit,
minimumLingerDurationMillis);
+ new LeaseAttemptStatus.LeasedToAnotherStatus(dagAction,
minimumLingerDurationMillis);
Review Comment:
`eventToRevisit` seems misnamed, since it's a timestamp, not an event. is
it the `concensusTimestampMillis`?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -52,14 +52,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
private static final String flowName = "testFlowName";
private static final String jobName = "testJobName";
private static final String flowExecutionId = "12345677";
+ private static final long eventTimeMillis = System.currentTimeMillis();
Review Comment:
usually test constants should be initialized by a well-known value. would
that work here?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -232,16 +234,19 @@ public void
testConditionallyAcquireLeaseIfFMatchingAllColsStatement() throws IO
*/
@Test (dependsOnMethods =
"testConditionallyAcquireLeaseIfFMatchingAllColsStatement")
public void testConditionallyAcquireLeaseIfFinishedLeasingStatement()
- throws IOException, InterruptedException, SQLException {
+ throws IOException, SQLException {
+ /*
Review Comment:
supposed to be commented out?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -103,21 +103,23 @@ Tests all cases of trying to acquire a lease (CASES 1-6
detailed below) for a fl
public void testAcquireLeaseSingleParticipant() throws Exception {
// Tests CASE 1 of acquire lease for a flow action event not present in DB
LeaseAttemptStatus firstLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, true);
Assert.assertTrue(firstLaunchStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus =
(LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
- Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <=
- firstObtainedStatus.getLeaseAcquisitionTimestamp());
+ long consensusEventTimeMillis = firstObtainedStatus.getEventTimeMillis();
+ Assert.assertTrue(consensusEventTimeMillis <=
firstObtainedStatus.getLeaseAcquisitionTimestamp());
+ // Make sure consensusEventTimeMillis is set and it's not 0 or the
original event time
+ Assert.assertTrue(consensusEventTimeMillis != eventTimeMillis &&
consensusEventTimeMillis != 0);
Review Comment:
let's use a positive assertion rather than a negative one, as the latter can
hide many possible problems; see:
https://en.wikipedia.org/wiki/Anna_Karenina_principle
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -232,16 +234,19 @@ public void
testConditionallyAcquireLeaseIfFMatchingAllColsStatement() throws IO
*/
@Test (dependsOnMethods =
"testConditionallyAcquireLeaseIfFMatchingAllColsStatement")
Review Comment:
what's the `F` for in the name `testCond...IfFMatching...`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java:
##########
@@ -49,14 +49,25 @@ class DagAction {
final String flowExecutionId;
final String jobName;
final DagActionType dagActionType;
- final boolean isReminder;
+ boolean isReminder;
+ long eventTimeMillis;
Review Comment:
looks like an instance could be created w/o these being set. shouldn't we
prevent that via `final`?
BTW, what is the "event" behind this time? is it when the API request came
in to say perform a "kill action"? why is it critical to store that?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -305,24 +306,26 @@ public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagAction dagAction, lo
DagActionStore.DagAction updatedDagAction =
adoptConsensusFlowExecutionId ?
dagAction.updateFlowExecutionId(dbEventTimestamp.getTime()) : dagAction;
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
CASE 2: Same event, lease is valid",
- updatedDagAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
+ updatedDagAction, dagAction.isReminder ? "reminder" :
"original", dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
- return new
LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
dbEventTimestamp.getTime(),
+ updatedDagAction.setEventTimeMillis(dbEventTimestamp.getTime());
Review Comment:
I've really had a lot of trouble grasping the motivation behind this PR...
but overall is the crux of it that you'd like `DagActionReminderScheduler` to
be operate on its own personal extension of a `DagAction`, such as the example
(`Reflection`) above?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -264,12 +269,19 @@ public void
testConditionallyAcquireLeaseIfFinishedLeasingStatement()
*/
@Test (dependsOnMethods =
"testConditionallyAcquireLeaseIfFinishedLeasingStatement")
public void testOlderReminderEventAcquireLease() throws IOException {
+ // Use current time as flowName to create unique event
+ DagActionStore.DagAction newLaunchAction = new
DagActionStore.DagAction(flowGroup,
+ String.valueOf(System.currentTimeMillis()), flowExecutionId, jobName,
DagActionStore.DagActionType.LAUNCH);
Review Comment:
please avoid "current time" in a test. instead use a known value. unless
this is somehow called in a loop, should be easy to choose a distinct flow name.
confusing name too, given there's already a `launchDagAction2`.
also, are tests like this the only callers of the 5-param version of the DA
ctor?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -305,24 +306,26 @@ public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagAction dagAction, lo
DagActionStore.DagAction updatedDagAction =
adoptConsensusFlowExecutionId ?
dagAction.updateFlowExecutionId(dbEventTimestamp.getTime()) : dagAction;
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
CASE 2: Same event, lease is valid",
- updatedDagAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
+ updatedDagAction, dagAction.isReminder ? "reminder" :
"original", dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
- return new
LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
dbEventTimestamp.getTime(),
+ updatedDagAction.setEventTimeMillis(dbEventTimestamp.getTime());
Review Comment:
I see this being updated, but unless I missed something critical, it won't
actually be durably persisted to the DB, will it?
`DagAction` is meant for sharing between the various distributed hosts. in
most any distributed sharing context, mutability is at best challenging, and at
worst a tragic invitation to error. therefore I DON'T SUGGEST to durably
persist the event time. immutability should be our standard technique.
where there's a need for additional local-to-each-host info, just compose
around `DagAction`. e.g.:
```
@Data
class ThisHostDagActionReflection {
private final DagAction dagAction;
private final long timestampMillis;
}
```
[yikes! crappy name... sorry]
Issue Time Tracking
-------------------
Worklog Id: (was: 921347)
Time Spent: 20m (was: 10m)
> Add eventTime and reminder fields to DagAction POJO
> ---------------------------------------------------
>
> Key: GOBBLIN-2073
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2073
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
> An event timestamp and reminder flag are used in conjunction with DagActions
> for lease arbitration and for reminder dagActions. It makes sense to include
> these fields in the DagAction object itself to easily transfer the
> information from one location to another. In particular for reminder events
> used to check back in on incomplete dagAction leases, these two fields are
> utilized and it's easiest to keep track of them when it's maintained within
> the consensusDagAction returned in a LeaseAttemptStatus
--
This message was sent by Atlassian Jira
(v8.20.10#820010)