[
https://issues.apache.org/jira/browse/GOBBLIN-2136?focusedWorklogId=933242&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-933242
]
ASF GitHub Bot logged work on GOBBLIN-2136:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 04/Sep/24 18:40
Start Date: 04/Sep/24 18:40
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4031:
URL: https://github.com/apache/gobblin/pull/4031#discussion_r1744231904
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -172,19 +159,19 @@ private void
createFlowFinishDeadlineTrigger(DagActionStore.LeaseParams leasePar
Dag.DagNode<JobExecutionPlan> dagNode =
this.dagManagementStateStore.getDag(leaseParams.getDagAction().getDagId()).get().getNodes().get(0);
try {
- timeOutForJobFinish = DagManagerUtils.getFlowSLA(dagNode);
+ timeOutForJobFinish = DagUtils.getFlowFinishDeadline(dagNode);
} catch (ConfigException e) {
log.warn("Flow SLA for flowGroup: {}, flowName: {} is given in invalid
format, using default SLA of {}",
dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY),
dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
- DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
- timeOutForJobFinish = DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS;
+ ServiceConfigKeys.DEFAULT_FLOW_SLA_MILLIS);
+ timeOutForJobFinish = ServiceConfigKeys.DEFAULT_FLOW_SLA_MILLIS;
Review Comment:
can these keys likewise be renamed from SLA => deadline?
##########
gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java:
##########
@@ -203,12 +178,13 @@ public class ServiceConfigKeys {
public static final int DEFAULT_MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT= 20;
public static final String ISSUE_REPO_CLASS = GOBBLIN_SERVICE_PREFIX +
"issueRepo.class";
+ public static final String QUOTA_MANAGER_PREFIX = "UserQuotaManagerPrefix.";
public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX =
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
- public static final String DAG_PROCESSING_ENGINE_ENABLED =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";
public static final String NUM_DAG_PROC_THREADS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
public static final String DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "nonRetryableExceptions";
-
public static final Integer DEFAULT_NUM_DAG_PROC_THREADS = 3;
- public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_EXECUTION_ENABLED =
GOBBLIN_SERVICE_PREFIX + "multiActiveExecutionEnabled";
+ public static final String JOB_START_SLA_TIME =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME;
+ public static final String JOB_START_SLA_UNITS =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME_UNIT;
+ public static final long DEFAULT_FLOW_SLA_MILLIS =
TimeUnit.HOURS.toMillis(24);
Review Comment:
why add the DPE prefix? (also SLA => deadline)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java:
##########
@@ -271,10 +267,7 @@ protected static MetricNameRegexFilter
getMetricsFilterForDagManager() {
}
public void cleanup() {
- // Add null check so that unit test will not affect each other when we
de-active non-instrumented DagManager
- if(this.metricContext != null &&
this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManager.class.getSimpleName()))
{
- // The DMThread's metrics mappings follow the lifecycle of the DMThread
itself and so are lost by DM deactivation-reactivation but the
RootMetricContext is a (persistent) singleton.
- // To avoid IllegalArgumentException by the RMC preventing (re-)add of a
metric already known, remove all metrics that a new DMThread thread would
attempt to add (in DagManagerThread::initialize) whenever running
post-re-enablement
Review Comment:
since we no longer have DMThreads, is the guard and/or the
`getMetricsFilterForDagManager` still needed?
also, should that method (L269, above) be renamed?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -249,9 +250,9 @@ public static void
sendEnforceFlowFinishDeadlineDagAction(DagManagementStateStor
public static long getDefaultJobStartDeadline(Config config) {
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(
- config, DagManager.JOB_START_SLA_UNITS,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
- return jobStartTimeUnit.toMillis(ConfigUtils.getLong(config,
DagManager.JOB_START_SLA_TIME,
- ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
+ config, ServiceConfigKeys.JOB_START_SLA_UNITS,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_DEADLINE_TIME_UNIT));
+ return jobStartTimeUnit.toMillis(ConfigUtils.getLong(config,
ServiceConfigKeys.JOB_START_SLA_TIME,
+ ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_DEADLINE_TIME));
Review Comment:
can we rename here too: SLA => deadline?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagUtilsTest.java:
##########
@@ -60,10 +62,29 @@ public class DagManagerUtilsTest {
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI));
+ @Test
+ void slaConfigCheck() throws Exception {
+ Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("5", 123456783L,
"FINISH_RUNNING", 1);
+
Assert.assertEquals(DagUtils.getFlowFinishDeadline(dag.getStartNodes().get(0)),
ServiceConfigKeys.DEFAULT_FLOW_SLA_MILLIS);
Review Comment:
also here
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java:
##########
@@ -42,7 +42,7 @@
/**
* An implementation for {@link DagProc} that marks the {@link Dag} as failed
and cancel the job if it does not start in
- * {@link
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME}
time.
+ * {@link org.apache.gobblin.service.ServiceConfigKeys#JOB_START_SLA_TIME}
time.
Review Comment:
SLA => deadline
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java:
##########
@@ -69,7 +70,7 @@ public class MySqlDagManagementStateStore implements
DagManagementStateStore {
Map<URI, TopologySpec> topologySpecMap;
private final Config config;
public static final String FAILED_DAG_STATESTORE_PREFIX =
"failedDagStateStore";
- public static final String DAG_STATESTORE_CLASS_KEY =
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+ public static final String DAG_STATESTORE_CLASS_KEY =
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX +
"dagStateStoreClass";
Review Comment:
is it possible to avoid qualifying this config key w/ the DPE's name?
##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -1071,13 +1071,13 @@ public class ConfigurationKeys {
* Configuration properties related to Flows
*/
public static final String FLOW_RUN_IMMEDIATELY = "flow.runImmediately";
- public static final String GOBBLIN_FLOW_SLA_TIME = "gobblin.flow.sla.time";
- public static final String GOBBLIN_FLOW_SLA_TIME_UNIT =
"gobblin.flow.sla.timeunit";
- public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT =
TimeUnit.MINUTES.name();
- public static final String GOBBLIN_JOB_START_SLA_TIME =
"gobblin.job.start.sla.time";
- public static final String GOBBLIN_JOB_START_SLA_TIME_UNIT =
"gobblin.job.start.sla.timeunit";
- public static final long FALLBACK_GOBBLIN_JOB_START_SLA_TIME = 10L;
- public static final String FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT =
TimeUnit.MINUTES.name();
+ public static final String GOBBLIN_FLOW_DEADLINE_TIME =
"gobblin.flow.deadline.time";
+ public static final String GOBBLIN_FLOW_DEADLINE_TIME_UNIT =
"gobblin.flow.deadline.timeunit";
Review Comment:
suggest `gobblin.flow.completion.deadline.time` or
`gobblin.flow.finish.deadline.time` to parallel
`gobblin.job.start.deadline.time` (rather than dropping one segment entirely)
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagUtilsTest.java:
##########
@@ -60,10 +62,29 @@ public class DagManagerUtilsTest {
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI));
+ @Test
+ void slaConfigCheck() throws Exception {
Review Comment:
shall we rename SLA => deadline?
Issue Time Tracking
-------------------
Worklog Id: (was: 933242)
Time Spent: 3.5h (was: 3h 20m)
> remove obsolete code related to DagManager
> ------------------------------------------
>
> Key: GOBBLIN-2136
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2136
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 3.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)