[ 
https://issues.apache.org/jira/browse/GOBBLIN-1868?focusedWorklogId=874740&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-874740
 ]

ASF GitHub Bot logged work on GOBBLIN-1868:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Aug/23 20:54
            Start Date: 04/Aug/23 20:54
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3731:
URL: https://github.com/apache/gobblin/pull/3731#discussion_r1284839495


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -240,7 +252,26 @@ public DagManager(Config config, JobStatusRetriever 
jobStatusRetriever, Orchestr
     TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, 
JOB_START_SLA_UNITS, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
     this.defaultJobStartSlaTimeMillis = 
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
     this.jobStatusRetriever = jobStatusRetriever;
-    this.orchestrator = orchestrator;
+    this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
+    this.flowStatusGenerator = flowStatusGenerator;
+    try {
+      String specCompilerClassName = 
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
+      if 
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
+        specCompilerClassName = 
config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);
+      }
+      log.info("Using specCompiler class name/alias " + specCompilerClassName);
+
+      this.specCompiler = (SpecCompiler) 
ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(specCompilerClassName)),
 config);
+    } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException |
+             ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+    this.flowConcurrencyFlag = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
+        ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);

Review Comment:
   this is a service level default to indicate if flowCurrency is enabled: 
renaming to `isFlowConcurrencyEnabled`





Issue Time Tracking
-------------------

    Worklog Id:     (was: 874740)
    Time Spent: 1h 50m  (was: 1h 40m)

> Refactor Common Utils between Orchestrator & DagManager
> -------------------------------------------------------
>
>                 Key: GOBBLIN-1868
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1868
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Create a Util class to contain functionality re-used between the DagManager 
> and Orchestrator when launching executions of a flow spec. In the common 
> case, the Orchestrator receives a flow to orchestrate, performs necessary 
> validations, and forwards the execution responsibility to the DagManager. The 
> DagManager's responsibility is to carry out any flow action requests. 
> However, with launch executions now being stored in the DagActionStateStore, 
> on restart or leadership change the DagManager has to perform validations 
> before executing any launch actions the previous leader was unable to 
> complete. Rather than duplicating the code or introducing a circular 
> dependency between the DagManager and Orchestrator, this class is utilized to 
> store the common functionality. It is stateless and requires all stateful 
> pieces to be passed as input from the caller.
> * Note: We expect further refactoring to be done to the DagManager in later 
> stage of multi-active development so we do not attempt *major* reorganization 
> as abstractions may change. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to