Repository: incubator-gobblin
Updated Branches:
  refs/heads/master c47f43dc3 -> 7af61741c


[GOBBLIN-495] Remove flowSpec when it is a runOnce case

Closes #2366 from yukuai518/flowSpec


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/7af61741
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/7af61741
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/7af61741

Branch: refs/heads/master
Commit: 7af61741c0206c87052b2e769c245ceb21673d97
Parents: c47f43d
Author: Kuai Yu <[email protected]>
Authored: Fri May 18 13:48:51 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Fri May 18 13:48:51 2018 -0700

----------------------------------------------------------------------
 .../service/FlowConfigsResourceHandler.java     |  8 ++--
 ...GobblinServiceFlowConfigResourceHandler.java | 45 +++++++++++++-------
 .../scheduler/GobblinServiceJobScheduler.java   | 27 +++++++-----
 3 files changed, 51 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7af61741/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
index b92ab9d..55850f6 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
@@ -25,22 +25,22 @@ import com.linkedin.restli.server.UpdateResponse;
 
 public interface FlowConfigsResourceHandler {
   /**
-   * Get flow config
+   * Get {@link FlowConfig}
    */
   FlowConfig getFlowConfig(FlowId flowId) throws FlowConfigLoggedException;
 
   /**
-   * Add flow config can be done locally only iff current node is a master
+   * Add {@link FlowConfig}
    */
   CreateResponse createFlowConfig(FlowConfig flowConfig) throws 
FlowConfigLoggedException;
 
   /**
-   * Update flow config can be done locally only iff current node is a master
+   * Update {@link FlowConfig}
    */
   UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig) throws 
FlowConfigLoggedException;
 
   /**
-   * Delete flow config can be done locally only iff current node is a master
+   * Delete {@link FlowConfig}
    */
   UpdateResponse deleteFlowConfig(FlowId flowId, Properties header) throws 
FlowConfigLoggedException;
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7af61741/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
index 55d9cf0..d0e7737 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -45,9 +45,9 @@ import org.apache.gobblin.service.modules.utils.HelixUtils;
 
 
 /**
- * A high available flow config resource handler which consider the leadership 
change.
- * When a non-master status detected, it will forward the rest-li request
- * to the master node. Otherwise it will handle it locally.
+ * An HA (high available) aware {@link FlowConfigsResourceHandler} which 
consider if current node is Active or Standby.
+ * When a Standby mode detected, it will forward the rest-li request ({@link 
FlowConfig})
+ * to the Active. Otherwise it will handle it locally.
  */
 @Slf4j
 public class GobblinServiceFlowConfigResourceHandler implements 
FlowConfigsResourceHandler {
@@ -76,11 +76,16 @@ public class GobblinServiceFlowConfigResourceHandler 
implements FlowConfigsResou
   }
 
   /**
-   * Method to handle create Restli request.
-   * In load balance mode, we will handle flowCatalog I/O locally before 
forwarding the message to Helix (Active) node.
-   * Please refer to {@link 
FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)}. It only handle 
flowCatalog I/O.
+   * Adding {@link FlowConfig} should check if current node is active (master).
+   * If current node is active, call {@link 
FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)} directly.
+   * If current node is standby, forward {@link 
ServiceConfigKeys#HELIX_FLOWSPEC_ADD} to active. The remote active will
+   * then call {@link 
FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)}.
    *
-   * The listeners of {@link 
org.apache.gobblin.runtime.spec_catalog.FlowCatalog} won't be triggered in 
balance mode.
+   * Please refer to {@link 
org.apache.gobblin.service.modules.core.ControllerUserDefinedMessageHandlerFactory}
 for remote handling.
+   *
+   * For better I/O load balance, user can enable {@link 
GobblinServiceFlowConfigResourceHandler#flowCatalogLocalCommit}.
+   * The {@link FlowConfig} will be then persisted to {@link 
org.apache.gobblin.runtime.spec_catalog.FlowCatalog} first before it is
+   * forwarded to active node (if current node is standby) for execution.
    */
   @Override
   public CreateResponse createFlowConfig(FlowConfig flowConfig)
@@ -111,11 +116,16 @@ public class GobblinServiceFlowConfigResourceHandler 
implements FlowConfigsResou
   }
 
   /**
-   * Method to handle update Restli request.
-   * In load balance mode, we will handle flowCatalog I/O locally before 
forwarding the message to Helix (Active) node.
-   * Please refer to {@link 
FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)}. It only handle 
flowCatalog I/O.
+   * Updating {@link FlowConfig} should check if current node is active 
(master).
+   * If current node is active, call {@link 
FlowConfigResourceLocalHandler#updateFlowConfig(FlowId, FlowConfig)} directly.
+   * If current node is standby, forward {@link 
ServiceConfigKeys#HELIX_FLOWSPEC_UPDATE} to active. The remote active will
+   * then call {@link FlowConfigResourceLocalHandler#updateFlowConfig(FlowId, 
FlowConfig)}.
+   *
+   * Please refer to {@link 
org.apache.gobblin.service.modules.core.ControllerUserDefinedMessageHandlerFactory}
 for remote handling.
    *
-   * The listeners of {@link 
org.apache.gobblin.runtime.spec_catalog.FlowCatalog} won't be triggered in 
balance mode.
+   * For better I/O load balance, user can enable {@link 
GobblinServiceFlowConfigResourceHandler#flowCatalogLocalCommit}.
+   * The {@link FlowConfig} will be then persisted to {@link 
org.apache.gobblin.runtime.spec_catalog.FlowCatalog} first before it is
+   * forwarded to active node (if current node is standby) for execution.
    */
   @Override
   public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig)
@@ -154,11 +164,16 @@ public class GobblinServiceFlowConfigResourceHandler 
implements FlowConfigsResou
   }
 
   /**
-   * Method to handle delete Restli request.
-   * In load balance mode, we will handle flowCatalog I/O locally before 
forwarding the message to Helix (Active) node.
-   * Please refer to {@link 
FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)}. It only handle 
flowCatalog I/O.
+   * Deleting {@link FlowConfig} should check if current node is active 
(master).
+   * If current node is active, call {@link 
FlowConfigResourceLocalHandler#deleteFlowConfig(FlowId, Properties)} directly.
+   * If current node is standby, forward {@link 
ServiceConfigKeys#HELIX_FLOWSPEC_REMOVE} to active. The remote active will
+   * then call {@link FlowConfigResourceLocalHandler#deleteFlowConfig(FlowId, 
Properties)}.
+   *
+   * Please refer to {@link 
org.apache.gobblin.service.modules.core.ControllerUserDefinedMessageHandlerFactory}
 for remote handling.
    *
-   * The listeners of {@link 
org.apache.gobblin.runtime.spec_catalog.FlowCatalog} won't be triggered in 
balance mode.
+   * For better I/O load balance, user can enable {@link 
GobblinServiceFlowConfigResourceHandler#flowCatalogLocalCommit}.
+   * The {@link FlowConfig} will be then persisted to {@link 
org.apache.gobblin.runtime.spec_catalog.FlowCatalog} first before it is
+   * forwarded to active node (if current node is standby) for execution.
    */
   @Override
   public UpdateResponse deleteFlowConfig(FlowId flowId, Properties header)

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7af61741/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 506a53e..1ec791a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -21,6 +21,8 @@ import java.net.URI;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.helix.HelixManager;
@@ -108,8 +110,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
 
     // Since we are going to change status to isActive=true, schedule all flows
     if (isActive) {
-      // Need to set active first; otherwise in the STANDBY->ACTIVE transition,
-      // the onAddSpec will forward specs to the leader, which is itself.
+      // Need to set active=true first; otherwise in the onAddSpec(), node 
will forward specs to active node, which is itself.
       this.isActive = isActive;
       if (this.flowCatalog.isPresent()) {
         Collection<Spec> specs = 
this.flowCatalog.get().getSpecsWithTimeUpdate();
@@ -129,8 +130,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
       for (Spec spec : this.scheduledFlowSpecs.values()) {
         onDeleteSpec(spec.getUri(), spec.getVersion());
       }
-      // Need to set active at the end; otherwise in the ACTIVE->STANDBY 
transition,
-      // the onDeleteSpec will forward specs to the leader, which is itself.
+      // Need to set active=false at the end; otherwise in the onDeleteSpec(), 
node will forward specs to active node, which is itself.
       this.isActive = isActive;
     }
   }
@@ -193,14 +193,15 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
 
     if (addedSpec instanceof FlowSpec) {
       try {
+        FlowSpec flowSpec  = (FlowSpec) addedSpec;
         Properties jobConfig = new Properties();
         Properties flowSpecProperties = ((FlowSpec) 
addedSpec).getConfigAsProperties();
         jobConfig.putAll(this.properties);
         jobConfig.setProperty(ConfigurationKeys.JOB_NAME_KEY, 
addedSpec.getUri().toString());
         jobConfig.setProperty(ConfigurationKeys.JOB_GROUP_KEY,
-            ((FlowSpec) 
addedSpec).getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY).toString());
+            
flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY).toString());
         jobConfig.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
-            ConfigUtils.getString(((FlowSpec) addedSpec).getConfig(), 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"));
+            ConfigUtils.getString((flowSpec).getConfig(), 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"));
         if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) 
&& StringUtils
             
.isNotBlank(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY)))
 {
           jobConfig.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY,
@@ -214,11 +215,11 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
           scheduleJob(jobConfig, null);
           if (PropertiesUtils.getPropAsBoolean(jobConfig, 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
             _log.info("RunImmediately requested, hence executing FlowSpec: " + 
addedSpec);
-            this.jobExecutor.execute(new NonScheduledJobRunner(jobConfig, 
null));
+            this.jobExecutor.execute(new 
NonScheduledJobRunner(flowSpec.getUri(), false, jobConfig, null));
           }
         } else {
           _log.info("No FlowSpec schedule found, so running FlowSpec: " + 
addedSpec);
-          this.jobExecutor.execute(new NonScheduledJobRunner(jobConfig, null));
+          this.jobExecutor.execute(new 
NonScheduledJobRunner(flowSpec.getUri(), true, jobConfig, null));
         }
       } catch (JobException je) {
         _log.error("{} Failed to schedule or run FlowSpec {}", serviceName,  
addedSpec, je);
@@ -321,19 +322,25 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
    * This class is responsible for running non-scheduled jobs.
    */
   class NonScheduledJobRunner implements Runnable {
-
+    private final URI specUri;
     private final Properties jobConfig;
     private final JobListener jobListener;
+    private final boolean removeSpec;
 
-    public NonScheduledJobRunner(Properties jobConfig, JobListener 
jobListener) {
+    public NonScheduledJobRunner(URI uri, boolean removeSpec, Properties 
jobConfig, JobListener jobListener) {
+      this.specUri = uri;
       this.jobConfig = jobConfig;
       this.jobListener = jobListener;
+      this.removeSpec = removeSpec;
     }
 
     @Override
     public void run() {
       try {
         GobblinServiceJobScheduler.this.runJob(this.jobConfig, 
this.jobListener);
+        if (flowCatalog.isPresent() && removeSpec) {
+          GobblinServiceJobScheduler.this.flowCatalog.get().remove(specUri, 
new Properties(), false);
+        }
       } catch (JobException je) {
         _log.error("Failed to run job " + 
this.jobConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
       }

Reply via email to