This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 347712f26 [GOBBLIN-1756] Fix the issue that we skipping flows for 
multihop jobs (#3617)
347712f26 is described below

commit 347712f2610353c806566435f1107ac64166fce8
Author: Zihan Li <[email protected]>
AuthorDate: Wed Dec 14 14:39:55 2022 -0800

    [GOBBLIN-1756] Fix the issue that we skipping flows for multihop jobs 
(#3617)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [hotfix] Fix the issue that we skipping flows for multihop jobs
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java | 1 -
 .../gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java     | 1 -
 .../apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java   | 6 ------
 .../apache/gobblin/service/modules/orchestration/DagManager.java    | 5 +----
 .../service/modules/scheduler/GobblinServiceJobScheduler.java       | 1 -
 5 files changed, 1 insertion(+), 13 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index f80177a8d..d93eefbe2 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -33,7 +33,6 @@ public class ServiceConfigKeys {
   public static final String GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "flowCatalog.enabled";
   public static final String GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "scheduler.enabled";
 
-  public static final String GOBBLIN_SERVICE_ADHOC_FLOW = 
GOBBLIN_SERVICE_PREFIX + "adhoc.flow";
 
   public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "restliServer.enabled";
   public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY 
= GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 86fee4a68..5b06f1eb4 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -206,7 +206,6 @@ public abstract class BaseFlowToJobSpecCompiler implements 
SpecCompiler {
         
(!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)
 || PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(), 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
       try {
         userQuotaManager.get().checkQuota(dag.getStartNodes());
-        
flowSpec.getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW,
 "true");
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 20d2339c7..d4ee50729 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -287,12 +287,6 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
     Instrumented.markMeter(flowCompilationSuccessFulMeter);
     Instrumented.updateTimer(flowCompilationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
 
-    if 
(Boolean.parseBoolean(flowSpec.getConfigAsProperties().getProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW)))
 {
-      for (Dag.DagNode<JobExecutionPlan> dagNode : 
jobExecutionPlanDag.getStartNodes()) {
-        
dagNode.getValue().getJobSpec().getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW,
 "true");
-      }
-    }
-
     return jobExecutionPlanDag;
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 5df4fb35f..0dd31d274 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -981,10 +981,7 @@ public class DagManager extends AbstractIdleService {
       // Run this spec on selected executor
       SpecProducer<Spec> producer;
       try {
-        if 
(!Boolean.parseBoolean(dagNode.getValue().getJobSpec().getConfigAsProperties().getProperty(
-            ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW, "false"))) {
-          quotaManager.checkQuota(Collections.singleton(dagNode));
-        }
+        quotaManager.checkQuota(Collections.singleton(dagNode));
 
         producer = DagManagerUtils.getSpecProducer(dagNode);
         TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? 
this.eventSubmitter.get().
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index c997f0603..3f0791062 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -336,7 +336,6 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
         // QuotaManager has idempotent checks for a dagNode, so this check 
won't double add quotas for a flow in the DagManager
         try {
           quotaManager.get().checkQuota(dag.getStartNodes());
-          ((FlowSpec) 
addedSpec).getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW,
 "true");
         } catch (IOException e) {
           throw new RuntimeException(e);
         }

Reply via email to