This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 347712f26 [GOBBLIN-1756] Fix the issue that we skipping flows for
multihop jobs (#3617)
347712f26 is described below
commit 347712f2610353c806566435f1107ac64166fce8
Author: Zihan Li <[email protected]>
AuthorDate: Wed Dec 14 14:39:55 2022 -0800
[GOBBLIN-1756] Fix the issue that we skipping flows for multihop jobs
(#3617)
* address comments
* use connectionmanager when httpclient is not cloesable
* [hotfix] Fix the issue that we skipping flows for multihop jobs
Co-authored-by: Zihan Li <[email protected]>
---
.../src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java | 1 -
.../gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java | 1 -
.../apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java | 6 ------
.../apache/gobblin/service/modules/orchestration/DagManager.java | 5 +----
.../service/modules/scheduler/GobblinServiceJobScheduler.java | 1 -
5 files changed, 1 insertion(+), 13 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index f80177a8d..d93eefbe2 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -33,7 +33,6 @@ public class ServiceConfigKeys {
public static final String GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY =
GOBBLIN_SERVICE_PREFIX + "flowCatalog.enabled";
public static final String GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY =
GOBBLIN_SERVICE_PREFIX + "scheduler.enabled";
- public static final String GOBBLIN_SERVICE_ADHOC_FLOW =
GOBBLIN_SERVICE_PREFIX + "adhoc.flow";
public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY =
GOBBLIN_SERVICE_PREFIX + "restliServer.enabled";
public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY
= GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 86fee4a68..5b06f1eb4 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -206,7 +206,6 @@ public abstract class BaseFlowToJobSpecCompiler implements
SpecCompiler {
(!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)
|| PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(),
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
try {
userQuotaManager.get().checkQuota(dag.getStartNodes());
-
flowSpec.getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW,
"true");
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 20d2339c7..d4ee50729 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -287,12 +287,6 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
Instrumented.markMeter(flowCompilationSuccessFulMeter);
Instrumented.updateTimer(flowCompilationTimer, System.nanoTime() -
startTime, TimeUnit.NANOSECONDS);
- if
(Boolean.parseBoolean(flowSpec.getConfigAsProperties().getProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW)))
{
- for (Dag.DagNode<JobExecutionPlan> dagNode :
jobExecutionPlanDag.getStartNodes()) {
-
dagNode.getValue().getJobSpec().getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW,
"true");
- }
- }
-
return jobExecutionPlanDag;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 5df4fb35f..0dd31d274 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -981,10 +981,7 @@ public class DagManager extends AbstractIdleService {
// Run this spec on selected executor
SpecProducer<Spec> producer;
try {
- if
(!Boolean.parseBoolean(dagNode.getValue().getJobSpec().getConfigAsProperties().getProperty(
- ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW, "false"))) {
- quotaManager.checkQuota(Collections.singleton(dagNode));
- }
+ quotaManager.checkQuota(Collections.singleton(dagNode));
producer = DagManagerUtils.getSpecProducer(dagNode);
TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ?
this.eventSubmitter.get().
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index c997f0603..3f0791062 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -336,7 +336,6 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
// QuotaManager has idempotent checks for a dagNode, so this check
won't double add quotas for a flow in the DagManager
try {
quotaManager.get().checkQuota(dag.getStartNodes());
- ((FlowSpec)
addedSpec).getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW,
"true");
} catch (IOException e) {
throw new RuntimeException(e);
}