This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0936016be [GOBBLIN-1725] Fix bugs in gaas warm standby mode (#3582)
0936016be is described below

commit 0936016be7e3b2a098fb8666fbb6829ea6083e46
Author: Zihan Li <[email protected]>
AuthorDate: Tue Oct 18 13:53:19 2022 -0700

    [GOBBLIN-1725] Fix bugs in gaas warm standby mode (#3582)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1725] Fix bugs in gaas warm standby mode
    
    * fix missing config
    
    * address comments
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../service/modules/flow/BaseFlowToJobSpecCompiler.java   | 13 +++++++------
 .../gobblin/service/modules/orchestration/DagManager.java | 15 +++++++++------
 .../gobblin/service/monitoring/GitFlowGraphMonitor.java   |  2 +-
 .../service/modules/orchestration/DagManagerFlowTest.java |  9 +++------
 4 files changed, 20 insertions(+), 19 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 976d12650..d0e997771 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -26,6 +26,7 @@ import java.util.Properties;
 
 import javax.inject.Inject;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import org.quartz.CronExpression;
 import org.slf4j.Logger;
@@ -191,7 +192,12 @@ public abstract class BaseFlowToJobSpecCompiler implements 
SpecCompiler {
     // always try to compile the flow to verify if it is compilable
     Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
 
-    if (this.warmStandbyEnabled &&
+    // If dag is null then a compilation error has occurred
+    if (dag != null && !dag.isEmpty()) {
+      response = dag.toString();
+    }
+
+    if (FlowCatalog.isCompileSuccessful(response) && this.warmStandbyEnabled 
&& !flowSpec.isExplain() &&
         
(!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)
 || PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(), 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
       try {
         userQuotaManager.checkQuota(dag.getStartNodes());
@@ -201,11 +207,6 @@ public abstract class BaseFlowToJobSpecCompiler implements 
SpecCompiler {
       }
     }
 
-    // If dag is null then a compilation error has occurred
-    if (dag != null && !dag.isEmpty()) {
-      response = dag.toString();
-    }
-    // todo: should we check quota here?
     return new AddSpecResponse<>(response);
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 4391a5f9b..0bcceaf62 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -200,11 +200,13 @@ public class DagManager extends AbstractIdleService {
   private final Optional<EventSubmitter> eventSubmitter;
   private final long failedDagRetentionTime;
   private final DagManagerMetrics dagManagerMetrics;
-  private final Optional<DagActionStore> dagActionStore;
+
+  @Inject(optional=true)
+  protected Optional<DagActionStore> dagActionStore;
 
   private volatile boolean isActive = false;
 
-  public DagManager(Config config, JobStatusRetriever jobStatusRetriever, 
Optional<DagActionStore> dagActionStore, boolean instrumentationEnabled) {
+  public DagManager(Config config, JobStatusRetriever jobStatusRetriever, 
boolean instrumentationEnabled) {
     this.config = config;
     this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
     this.runQueue = (BlockingQueue<Dag<JobExecutionPlan>>[]) 
initializeDagQueue(this.numThreads);
@@ -221,7 +223,6 @@ public class DagManager extends AbstractIdleService {
     } else {
       this.eventSubmitter = Optional.absent();
     }
-    this.dagActionStore = dagActionStore;
     this.dagManagerMetrics = new DagManagerMetrics(metricContext);
     TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, 
JOB_START_SLA_UNITS, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
     this.defaultJobStartSlaTimeMillis = 
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
@@ -249,9 +250,9 @@ public class DagManager extends AbstractIdleService {
     return queue;
   }
 
-  @Inject(optional = true)
-  public DagManager(Config config, JobStatusRetriever jobStatusRetriever, 
Optional<DagActionStore> dagActionStore) {
-    this(config, jobStatusRetriever, dagActionStore, true);
+  @Inject
+  public DagManager(Config config, JobStatusRetriever jobStatusRetriever) {
+    this(config, jobStatusRetriever, true);
   }
 
   /** Do Nothing on service startup. Scheduling of {@link DagManagerThread}s 
and loading of any {@link Dag}s is done
@@ -665,6 +666,8 @@ public class DagManager extends AbstractIdleService {
         props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, 
serializedFuture);
         sendCancellationEvent(dagNodeToCancel.getValue());
       }
+      props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+          
ConfigUtils.getString(dagNodeToCancel.getValue().getJobSpec().getConfig(), 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ""));
       
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
 props);
     }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
index 684ab2c90..68174fcd6 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
@@ -66,9 +66,9 @@ public class GitFlowGraphMonitor extends GitMonitoringService 
implements FlowGra
       .put(ConfigurationKeys.GIT_MONITOR_REPO_DIR, 
DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR)
       .put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR, 
DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR)
       .put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME, 
DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME)
-      .put(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL, 
DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL)
       .put(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS, 
ConfigurationKeys.DEFAULT_PROPERTIES_EXTENSIONS)
       .put(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS, 
ConfigurationKeys.DEFAULT_CONF_EXTENSIONS)
+      .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 
DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL)
       .put(SHOULD_CHECKPOINT_HASHES, false).build());
 
   private final Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog;
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 71ddd9333..59aeda41f 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -88,7 +88,8 @@ public class DagManagerFlowTest {
     dagActionStore = new MysqlDagActionStore(config);
     dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.DagActionValue.KILL);
     dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId_2, 
DagActionStore.DagActionValue.RESUME);
-    dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props), 
Optional.of(dagActionStore), false);
+    dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props), 
false);
+    dagManager.dagActionStore = Optional.of(dagActionStore);
     dagManager.setActive(true);
     this.dagNumThreads = dagManager.getNumThreads();
     Thread.sleep(10000);
@@ -325,11 +326,7 @@ class CancelPredicate implements Predicate<Void> {
 class MockedDagManager extends DagManager {
 
   public MockedDagManager(Config config, boolean instrumentationEnabled) {
-    super(config, createJobStatusRetriever(), Optional.absent(), 
instrumentationEnabled);
-  }
-
-  public MockedDagManager(Config config, Optional<DagActionStore> 
dagactionStore, boolean instrumentationEnabled) {
-    super(config, createJobStatusRetriever(), dagactionStore, 
instrumentationEnabled);
+    super(config, createJobStatusRetriever(), instrumentationEnabled);
   }
 
   private static JobStatusRetriever createJobStatusRetriever() {

Reply via email to