phet commented on code in PR #3952:
URL: https://github.com/apache/gobblin/pull/3952#discussion_r1620079757
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java:
##########
@@ -35,9 +35,9 @@ public class FlowLaunchHandlerTest {
String cronExpressionSuffix =
truncateFirstTwoFieldsOfCronExpression(cronExpression);
int schedulerBackOffMillis = 10;
DagActionStore.DagAction dagAction = new
DagActionStore.DagAction("flowName", "flowGroup",
- String.valueOf(flowExecutionId), "jobName",
DagActionStore.DagActionType.LAUNCH);
+ String.valueOf(flowExecutionId), "jobName",
DagActionStore.DagActionType.LAUNCH, eventToRevisit);
LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus =
- new LeaseAttemptStatus.LeasedToAnotherStatus(dagAction, eventToRevisit,
minimumLingerDurationMillis);
+ new LeaseAttemptStatus.LeasedToAnotherStatus(dagAction,
minimumLingerDurationMillis);
Review Comment:
`eventToRevisit` seems misnamed, since it's a timestamp, not an event. is
it the `concensusTimestampMillis`?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -52,14 +52,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
private static final String flowName = "testFlowName";
private static final String jobName = "testJobName";
private static final String flowExecutionId = "12345677";
+ private static final long eventTimeMillis = System.currentTimeMillis();
Review Comment:
usually test constants should be initialized by a well-known value. would
that work here?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -232,16 +234,19 @@ public void
testConditionallyAcquireLeaseIfFMatchingAllColsStatement() throws IO
*/
@Test (dependsOnMethods =
"testConditionallyAcquireLeaseIfFMatchingAllColsStatement")
public void testConditionallyAcquireLeaseIfFinishedLeasingStatement()
- throws IOException, InterruptedException, SQLException {
+ throws IOException, SQLException {
+ /*
Review Comment:
supposed to be commented out?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -103,21 +103,23 @@ Tests all cases of trying to acquire a lease (CASES 1-6
detailed below) for a fl
public void testAcquireLeaseSingleParticipant() throws Exception {
// Tests CASE 1 of acquire lease for a flow action event not present in DB
LeaseAttemptStatus firstLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, true);
Assert.assertTrue(firstLaunchStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus =
(LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
- Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <=
- firstObtainedStatus.getLeaseAcquisitionTimestamp());
+ long consensusEventTimeMillis = firstObtainedStatus.getEventTimeMillis();
+ Assert.assertTrue(consensusEventTimeMillis <=
firstObtainedStatus.getLeaseAcquisitionTimestamp());
+ // Make sure consensusEventTimeMillis is set and it's not 0 or the
original event time
+ Assert.assertTrue(consensusEventTimeMillis != eventTimeMillis &&
consensusEventTimeMillis != 0);
Review Comment:
let's use a positive assertion rather than a negative one, as the latter can
hide many possible problems; see:
https://en.wikipedia.org/wiki/Anna_Karenina_principle
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -232,16 +234,19 @@ public void
testConditionallyAcquireLeaseIfFMatchingAllColsStatement() throws IO
*/
@Test (dependsOnMethods =
"testConditionallyAcquireLeaseIfFMatchingAllColsStatement")
Review Comment:
what's the `F` for in the name `testCond...IfFMatching...`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java:
##########
@@ -49,14 +49,25 @@ class DagAction {
final String flowExecutionId;
final String jobName;
final DagActionType dagActionType;
- final boolean isReminder;
+ boolean isReminder;
+ long eventTimeMillis;
Review Comment:
looks like an instance could be created w/o these being set. shouldn't we
prevent that via `final`?
BTW, what is the "event" behind this time? is it when the API request came
in to say perform a "kill action"? why is it critical to store that?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -305,24 +306,26 @@ public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagAction dagAction, lo
DagActionStore.DagAction updatedDagAction =
adoptConsensusFlowExecutionId ?
dagAction.updateFlowExecutionId(dbEventTimestamp.getTime()) : dagAction;
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
CASE 2: Same event, lease is valid",
- updatedDagAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
+ updatedDagAction, dagAction.isReminder ? "reminder" :
"original", dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
- return new
LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
dbEventTimestamp.getTime(),
+ updatedDagAction.setEventTimeMillis(dbEventTimestamp.getTime());
Review Comment:
I've really had a lot of trouble grasping the motivation behind this PR...
but overall is the crux of it that you'd like `DagActionReminderScheduler` to
be operate on its own personal extension of a `DagAction`, such as the example
(`Reflection`) above?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -264,12 +269,19 @@ public void
testConditionallyAcquireLeaseIfFinishedLeasingStatement()
*/
@Test (dependsOnMethods =
"testConditionallyAcquireLeaseIfFinishedLeasingStatement")
public void testOlderReminderEventAcquireLease() throws IOException {
+ // Use current time as flowName to create unique event
+ DagActionStore.DagAction newLaunchAction = new
DagActionStore.DagAction(flowGroup,
+ String.valueOf(System.currentTimeMillis()), flowExecutionId, jobName,
DagActionStore.DagActionType.LAUNCH);
Review Comment:
please avoid "current time" in a test. instead use a known value. unless
this is somehow called in a loop, should be easy to choose a distinct flow name.
confusing name too, given there's already a `launchDagAction2`.
also, are tests like this the only callers of the 5-param version of the DA
ctor?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -305,24 +306,26 @@ public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagAction dagAction, lo
DagActionStore.DagAction updatedDagAction =
adoptConsensusFlowExecutionId ?
dagAction.updateFlowExecutionId(dbEventTimestamp.getTime()) : dagAction;
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
CASE 2: Same event, lease is valid",
- updatedDagAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
+ updatedDagAction, dagAction.isReminder ? "reminder" :
"original", dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
- return new
LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
dbEventTimestamp.getTime(),
+ updatedDagAction.setEventTimeMillis(dbEventTimestamp.getTime());
Review Comment:
I see this being updated, but unless I missed something critical, it won't
actually be durably persisted to the DB, will it?
`DagAction` is meant for sharing between the various distributed hosts. in
most any distributed sharing context, mutability is at best challenging, and at
worst a tragic invitation to error. therefore I DON'T SUGGEST to durably
persist the event time. immutability should be our standard technique.
where there's a need for additional local-to-each-host info, just compose
around `DagAction`. e.g.:
```
@Data
class ThisHostDagActionReflection {
private final DagAction dagAction;
private final long timestampMillis;
}
```
[yikes! crappy name... sorry]
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]