[ 
https://issues.apache.org/jira/browse/GOBBLIN-2136?focusedWorklogId=933242&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-933242
 ]

ASF GitHub Bot logged work on GOBBLIN-2136:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Sep/24 18:40
            Start Date: 04/Sep/24 18:40
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4031:
URL: https://github.com/apache/gobblin/pull/4031#discussion_r1744231904


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -172,19 +159,19 @@ private void 
createFlowFinishDeadlineTrigger(DagActionStore.LeaseParams leasePar
     Dag.DagNode<JobExecutionPlan> dagNode = 
this.dagManagementStateStore.getDag(leaseParams.getDagAction().getDagId()).get().getNodes().get(0);
 
     try {
-      timeOutForJobFinish = DagManagerUtils.getFlowSLA(dagNode);
+      timeOutForJobFinish = DagUtils.getFlowFinishDeadline(dagNode);
     } catch (ConfigException e) {
       log.warn("Flow SLA for flowGroup: {}, flowName: {} is given in invalid 
format, using default SLA of {}",
           
dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY),
           
dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
-          DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
-      timeOutForJobFinish = DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS;
+          ServiceConfigKeys.DEFAULT_FLOW_SLA_MILLIS);
+      timeOutForJobFinish = ServiceConfigKeys.DEFAULT_FLOW_SLA_MILLIS;

Review Comment:
   can these keys likewise be renamed from SLA => deadline?



##########
gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java:
##########
@@ -203,12 +178,13 @@ public class ServiceConfigKeys {
   public static final int DEFAULT_MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT= 20;
 
   public static final String ISSUE_REPO_CLASS = GOBBLIN_SERVICE_PREFIX + 
"issueRepo.class";
+  public static final String QUOTA_MANAGER_PREFIX = "UserQuotaManagerPrefix.";
 
   public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX = 
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
-  public static final String DAG_PROCESSING_ENGINE_ENABLED = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";
   public static final String NUM_DAG_PROC_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
   public static final String DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "nonRetryableExceptions";
-
   public static final Integer DEFAULT_NUM_DAG_PROC_THREADS = 3;
-  public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_EXECUTION_ENABLED = 
GOBBLIN_SERVICE_PREFIX + "multiActiveExecutionEnabled";
+  public static final String JOB_START_SLA_TIME = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME;
+  public static final String JOB_START_SLA_UNITS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME_UNIT;
+  public static final long DEFAULT_FLOW_SLA_MILLIS = 
TimeUnit.HOURS.toMillis(24);

Review Comment:
   why add the DPE prefix?  (also SLA => deadline)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java:
##########
@@ -271,10 +267,7 @@ protected static MetricNameRegexFilter 
getMetricsFilterForDagManager() {
   }
 
   public void cleanup() {
-    // Add null check so that unit test will not affect each other when we 
de-active non-instrumented DagManager
-    if(this.metricContext != null && 
this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManager.class.getSimpleName()))
 {
-      // The DMThread's metrics mappings follow the lifecycle of the DMThread 
itself and so are lost by DM deactivation-reactivation but the 
RootMetricContext is a (persistent) singleton.
-      // To avoid IllegalArgumentException by the RMC preventing (re-)add of a 
metric already known, remove all metrics that a new DMThread thread would 
attempt to add (in DagManagerThread::initialize) whenever running 
post-re-enablement

Review Comment:
   since we no longer have DMThreads, is the guard and/or the 
`getMetricsFilterForDagManager` still needed?
   
   also, should that method (L269, above) be renamed?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -249,9 +250,9 @@ public static void 
sendEnforceFlowFinishDeadlineDagAction(DagManagementStateStor
 
   public static long getDefaultJobStartDeadline(Config config) {
     TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(
-        config, DagManager.JOB_START_SLA_UNITS, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
-    return jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, 
DagManager.JOB_START_SLA_TIME,
-        ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
+        config, ServiceConfigKeys.JOB_START_SLA_UNITS, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_DEADLINE_TIME_UNIT));
+    return jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, 
ServiceConfigKeys.JOB_START_SLA_TIME,
+        ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_DEADLINE_TIME));

Review Comment:
   can we rename here too: SLA => deadline?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagUtilsTest.java:
##########
@@ -60,10 +62,29 @@ public class DagManagerUtilsTest {
       .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
           MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI));
 
+  @Test
+  void slaConfigCheck() throws Exception {
+    Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("5", 123456783L, 
"FINISH_RUNNING", 1);
+    
Assert.assertEquals(DagUtils.getFlowFinishDeadline(dag.getStartNodes().get(0)), 
ServiceConfigKeys.DEFAULT_FLOW_SLA_MILLIS);

Review Comment:
   also here



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java:
##########
@@ -42,7 +42,7 @@
 
 /**
  * An implementation for {@link DagProc} that marks the {@link Dag} as failed 
and cancel the job if it does not start in
- * {@link 
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME} 
time.
+ * {@link org.apache.gobblin.service.ServiceConfigKeys#JOB_START_SLA_TIME} 
time.

Review Comment:
   SLA => deadline



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java:
##########
@@ -69,7 +70,7 @@ public class MySqlDagManagementStateStore implements 
DagManagementStateStore {
   Map<URI, TopologySpec> topologySpecMap;
   private final Config config;
   public static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
-  public static final String DAG_STATESTORE_CLASS_KEY = 
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+  public static final String DAG_STATESTORE_CLASS_KEY = 
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + 
"dagStateStoreClass";

Review Comment:
   is it possible to avoid qualifying this config key w/ the DPE's name?



##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -1071,13 +1071,13 @@ public class ConfigurationKeys {
    * Configuration properties related to Flows
    */
   public static final String FLOW_RUN_IMMEDIATELY = "flow.runImmediately";
-  public static final String GOBBLIN_FLOW_SLA_TIME = "gobblin.flow.sla.time";
-  public static final String GOBBLIN_FLOW_SLA_TIME_UNIT = 
"gobblin.flow.sla.timeunit";
-  public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT = 
TimeUnit.MINUTES.name();
-  public static final String GOBBLIN_JOB_START_SLA_TIME = 
"gobblin.job.start.sla.time";
-  public static final String GOBBLIN_JOB_START_SLA_TIME_UNIT = 
"gobblin.job.start.sla.timeunit";
-  public static final long FALLBACK_GOBBLIN_JOB_START_SLA_TIME = 10L;
-  public static final String FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT = 
TimeUnit.MINUTES.name();
+  public static final String GOBBLIN_FLOW_DEADLINE_TIME = 
"gobblin.flow.deadline.time";
+  public static final String GOBBLIN_FLOW_DEADLINE_TIME_UNIT = 
"gobblin.flow.deadline.timeunit";

Review Comment:
   suggest `gobblin.flow.completion.deadline.time` or 
`gobblin.flow.finish.deadline.time` to parallel 
`gobblin.job.start.deadline.time` (rather than dropping one segment entirely)



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagUtilsTest.java:
##########
@@ -60,10 +62,29 @@ public class DagManagerUtilsTest {
       .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
           MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI));
 
+  @Test
+  void slaConfigCheck() throws Exception {

Review Comment:
   shall we rename SLA => deadline?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 933242)
    Time Spent: 3.5h  (was: 3h 20m)

> remove obsolete code related to DagManager
> ------------------------------------------
>
>                 Key: GOBBLIN-2136
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2136
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 3.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to