phet commented on code in PR #3950:
URL: https://github.com/apache/gobblin/pull/3950#discussion_r1605574480
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -162,12 +220,16 @@ private DagTask createDagTask(DagActionStore.DagAction
dagAction, LeaseAttemptSt
DagActionStore.DagActionType dagActionType = dagAction.getDagActionType();
switch (dagActionType) {
+ case ENFORCE_FINISH_DEADLINE:
+ return new EnforceFinishDeadlineDagTask(dagAction,
leaseObtainedStatus, dagActionStore.get());
+ case ENFORCE_START_DEADLINE:
+ return new EnforceStartDeadlineDagTask(dagAction, leaseObtainedStatus,
dagActionStore.get());
Review Comment:
start and finish are typically complements, but that's not the case here,
correct? rather its "job start" and "flow finish".
first off, please add the job vs. flow qualifier into all of these for names
action type, `DagTask`, `DagProc`, etc..
secondly, could we prevent the start vs. finish confusion of them operating
on distinct entities by rename to `flowCompletionDeadline`?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java:
##########
@@ -40,7 +40,8 @@
*/
@Slf4j
public class FlowStatusGenerator {
- public static final List<String> FINISHED_STATUSES =
Lists.newArrayList("FAILED", "COMPLETE", "CANCELLED");
+ public static final List<String> FINISHED_STATUSES =
Lists.newArrayList(ExecutionStatus.FAILED.name(),
+ ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name());
Review Comment:
good work!
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java:
##########
@@ -69,6 +72,10 @@ public DagNodeId getDagNodeId() {
return new DagNodeId(this.flowGroup, this.flowName,
Long.parseLong(this.flowExecutionId), this.flowGroup, this.jobName);
}
+
+ public void setReminder(boolean isReminder) {
Review Comment:
I actually wouldn't have expected that to be necessary, given the `@Data`
and the non-`final` field
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -118,6 +124,8 @@ public void setUp() throws Exception {
builder =
builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
this.mysqlDagActionStore = mock(MysqlDagActionStore.class);
+ this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionReminderScheduler.class)).thenReturn(mock(DagActionReminderScheduler.class));
+ this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
Review Comment:
this is a big PR! I don't fully grasp what else changed that requires this
mod. please explain
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -138,8 +138,8 @@ public class DagManager extends AbstractIdleService {
public static final Integer DEFAULT_FAILED_DAG_POLLING_INTERVAL = 60;
public static final String DAG_MANAGER_HEARTBEAT =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER +
"dagManager.heartbeat-%s";
// Default job start SLA time if configured, measured in minutes. Default is
10 minutes
- private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
- private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
+ public static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
+ public static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
Review Comment:
please add a TODO to rename these both from DM and also from SLA to deadline
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java:
##########
@@ -30,12 +30,14 @@
public interface DagActionStore {
public static final String NO_JOB_NAME_DEFAULT = "";
enum DagActionType {
+ CANCEL, // Invoked through DagManager if flow has been stuck in
Orchestrated state for a while
KILL, // Kill invoked through API call
- RESUME, // Resume flow invoked through API call
LAUNCH, // Launch new flow execution invoked adhoc or through scheduled
trigger
+ REEVALUATE, // Re-evaluate what needs to be done upon receipt of a final
job status
+ RESUME, // Resume flow invoked through API call
RETRY, // Invoked through DagManager for flows configured to allow retries
- CANCEL, // Invoked through DagManager if flow has been stuck in
Orchestrated state for a while
- REEVALUATE // Re-evaluate what needs to be done upon receipt of a final
job status
+ ENFORCE_START_DEADLINE, // Enforce job start deadline
+ ENFORCE_FINISH_DEADLINE, // Enforce job finish deadline
Review Comment:
are you sure it's really "job finish"? somehow I had it in my head that
it's "flow completion"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]