[
https://issues.apache.org/jira/browse/GOBBLIN-1868?focusedWorklogId=874740&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-874740
]
ASF GitHub Bot logged work on GOBBLIN-1868:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 04/Aug/23 20:54
Start Date: 04/Aug/23 20:54
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3731:
URL: https://github.com/apache/gobblin/pull/3731#discussion_r1284839495
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -240,7 +252,26 @@ public DagManager(Config config, JobStatusRetriever
jobStatusRetriever, Orchestr
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config,
JOB_START_SLA_UNITS,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
this.defaultJobStartSlaTimeMillis =
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
this.jobStatusRetriever = jobStatusRetriever;
- this.orchestrator = orchestrator;
+ this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
+ this.flowStatusGenerator = flowStatusGenerator;
+ try {
+ String specCompilerClassName =
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
+ if
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
+ specCompilerClassName =
config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);
+ }
+ log.info("Using specCompiler class name/alias " + specCompilerClassName);
+
+ this.specCompiler = (SpecCompiler)
ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(specCompilerClassName)),
config);
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException |
+ ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ this.flowConcurrencyFlag = ConfigUtils.getBoolean(config,
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
+ ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
Review Comment:
this is a service level default to indicate if flowCurrency is enabled:
renaming to `isFlowConcurrencyEnabled`
Issue Time Tracking
-------------------
Worklog Id: (was: 874740)
Time Spent: 1h 50m (was: 1h 40m)
> Refactor Common Utils between Orchestrator & DagManager
> -------------------------------------------------------
>
> Key: GOBBLIN-1868
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1868
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> Create a Util class to contain functionality re-used between the DagManager
> and Orchestrator when launching executions of a flow spec. In the common
> case, the Orchestrator receives a flow to orchestrate, performs necessary
> validations, and forwards the execution responsibility to the DagManager. The
> DagManager's responsibility is to carry out any flow action requests.
> However, with launch executions now being stored in the DagActionStateStore,
> on restart or leadership change the DagManager has to perform validations
> before executing any launch actions the previous leader was unable to
> complete. Rather than duplicating the code or introducing a circular
> dependency between the DagManager and Orchestrator, this class is utilized to
> store the common functionality. It is stateless and requires all stateful
> pieces to be passed as input from the caller.
> * Note: We expect further refactoring to be done to the DagManager in later
> stage of multi-active development so we do not attempt *major* reorganization
> as abstractions may change.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)