[GOBBLIN-458] Refactor flowConfig resource handler

Closes #2329 from yukuai518/delete


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/01302a6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/01302a6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/01302a6d

Branch: refs/heads/master
Commit: 01302a6db2c468507c0adea5daa69de997aaf14b
Parents: 2f76947
Author: Kuai Yu <[email protected]>
Authored: Wed May 16 11:40:22 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Wed May 16 11:40:22 2018 -0700

----------------------------------------------------------------------
 .../gobblin/service/ServiceConfigKeys.java      |   5 +
 .../apache/gobblin/service/FlowConfigTest.java  |   2 +-
 .../service/FlowConfigLoggedException.java      |  41 ++++
 .../service/FlowConfigResourceLocalHandler.java | 202 ++++++++++++++++++
 .../gobblin/service/FlowConfigsResource.java    | 182 ++--------------
 .../service/FlowConfigsResourceHandler.java     |  47 +++++
 .../runtime/spec_catalog/FlowCatalog.java       |  34 +--
 ...trollerUserDefinedMessageHandlerFactory.java | 206 +++++++++++++++++++
 .../service/modules/core/GitConfigMonitor.java  |   7 +-
 .../modules/core/GobblinServiceManager.java     | 162 ++++-----------
 .../service/modules/restli/FlowConfigUtils.java | 105 ++++++++++
 ...GobblinServiceFlowConfigResourceHandler.java | 206 +++++++++++++++++++
 .../scheduler/GobblinServiceJobScheduler.java   |  48 +----
 .../modules/core/GobblinServiceHATest.java      |  58 +++++-
 .../modules/restli/FlowConfigUtilsTest.java     | 164 +++++++++++++++
 gobblin-service/src/test/resources/log4j.xml    |  40 ++++
 .../apache/gobblin/util/PropertiesUtils.java    |  21 +-
 17 files changed, 1177 insertions(+), 353 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 7231e0c..72e06a0 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -47,6 +47,11 @@ public class ServiceConfigKeys {
 
   // Flow Compiler Keys
   public static final String GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY = 
GOBBLIN_SERVICE_PREFIX + "flowCompiler.class";
+
+  // Flow Catalog Keys
+  public static final String GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT = 
GOBBLIN_SERVICE_PREFIX + "flowCatalog.localCommit";
+  public static final boolean 
DEFAULT_GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT = true;
+
   /**
    * Directly use canonical class name here to avoid introducing additional 
dependency here.
    */

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
index a373762..951103c 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
@@ -82,7 +82,7 @@ public class FlowConfigTest {
     Injector injector = Guice.createInjector(new Module() {
        @Override
        public void configure(Binder binder) {
-         
binder.bind(FlowCatalog.class).annotatedWith(Names.named("flowCatalog")).toInstance(flowCatalog);
+         
binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsResourceHandler")).toInstance(new
 FlowConfigResourceLocalHandler(flowCatalog));
          // indicate that we are in unit testing since the resource is being 
blocked until flow catalog changes have
          // been made
          
binder.bindConstant().annotatedWith(Names.named("readyToUse")).to(Boolean.TRUE);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigLoggedException.java
----------------------------------------------------------------------
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigLoggedException.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigLoggedException.java
new file mode 100644
index 0000000..848498a
--- /dev/null
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigLoggedException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.server.RestLiServiceException;
+
+/**
+ * Exception thrown by {@link FlowConfigsResourceHandler} when it cannot 
handle Restli gracefully.
+ */
+public class FlowConfigLoggedException extends RestLiServiceException {
+  private static final Logger log = 
LoggerFactory.getLogger(FlowConfigLoggedException.class);
+
+  public FlowConfigLoggedException(final HttpStatus status, final String 
message) {
+    super(status, message);
+    log.error(message);
+  }
+
+  public FlowConfigLoggedException(final HttpStatus status, final String 
message, final Throwable cause) {
+    super(status, message, cause);
+    log.error(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
new file mode 100644
index 0000000..0007b6a
--- /dev/null
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.google.common.collect.Maps;
+import com.linkedin.data.template.StringMap;
+import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.server.CreateResponse;
+import com.linkedin.restli.server.UpdateResponse;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+
+/**
+ * A {@link FlowConfigsResourceHandler} that handles Restli locally.
+ */
+@Slf4j
+public class FlowConfigResourceLocalHandler implements 
FlowConfigsResourceHandler {
+  @Getter
+  private FlowCatalog flowCatalog;
+  public FlowConfigResourceLocalHandler(FlowCatalog flowCatalog) {
+    this.flowCatalog = flowCatalog;
+  }
+
+  /**
+   * Get flow config
+   */
+  public FlowConfig getFlowConfig(FlowId flowId) throws 
FlowConfigLoggedException {
+    log.info("[GAAS-REST] Get called with flowGroup {} flowName {}", 
flowId.getFlowGroup(), flowId.getFlowName());
+
+    try {
+      URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null);
+      URI flowUri = new URI(flowCatalogURI.getScheme(), 
flowCatalogURI.getAuthority(),
+          "/" + flowId.getFlowGroup() + "/" + flowId.getFlowName(), null, 
null);
+      FlowSpec spec = (FlowSpec) flowCatalog.getSpec(flowUri);
+      FlowConfig flowConfig = new FlowConfig();
+      Properties flowProps = spec.getConfigAsProperties();
+      Schedule schedule = null;
+
+      if (flowProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+        schedule = new Schedule();
+        
schedule.setCronSchedule(flowProps.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+      }
+      if (flowProps.containsKey(ConfigurationKeys.JOB_TEMPLATE_PATH)) {
+        
flowConfig.setTemplateUris(flowProps.getProperty(ConfigurationKeys.JOB_TEMPLATE_PATH));
+      } else if (spec.getTemplateURIs().isPresent()) {
+        
flowConfig.setTemplateUris(StringUtils.join(spec.getTemplateURIs().get(), ","));
+      } else {
+        flowConfig.setTemplateUris("NA");
+      }
+      if (schedule != null) {
+        if (flowProps.containsKey(ConfigurationKeys.FLOW_RUN_IMMEDIATELY)) {
+          
schedule.setRunImmediately(Boolean.valueOf(flowProps.getProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY)));
+        }
+
+        flowConfig.setSchedule(schedule);
+      }
+
+      // remove keys that were injected as part of flowSpec creation
+      flowProps.remove(ConfigurationKeys.JOB_SCHEDULE_KEY);
+      flowProps.remove(ConfigurationKeys.JOB_TEMPLATE_PATH);
+
+      StringMap flowPropsAsStringMap = new StringMap();
+      flowPropsAsStringMap.putAll(Maps.fromProperties(flowProps));
+
+      return flowConfig.setId(new 
FlowId().setFlowGroup(flowId.getFlowGroup()).setFlowName(flowId.getFlowName()))
+          .setProperties(flowPropsAsStringMap);
+    } catch (URISyntaxException e) {
+      throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "bad 
URI " + flowId.getFlowName(), e);
+    } catch (SpecNotFoundException e) {
+      throw new FlowConfigLoggedException(HttpStatus.S_404_NOT_FOUND, "Flow 
requested does not exist: " + flowId.getFlowName(), null);
+    }
+  }
+
+  /**
+   * Add flowConfig locally and trigger all listeners iff @param 
triggerListener is set to true
+   */
+  public CreateResponse createFlowConfig(FlowConfig flowConfig, boolean 
triggerListener) throws FlowConfigLoggedException {
+    log.info("[GAAS-REST] Create called with flowGroup " + 
flowConfig.getId().getFlowGroup() + " flowName " + 
flowConfig.getId().getFlowName());
+    FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
+    this.flowCatalog.put(flowSpec, triggerListener);
+    return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new 
EmptyRecord()), HttpStatus.S_201_CREATED);
+  }
+
+  /**
+   * Add flowConfig locally and trigger all listeners
+   */
+  public CreateResponse createFlowConfig(FlowConfig flowConfig) throws 
FlowConfigLoggedException {
+    return this.createFlowConfig(flowConfig, true);
+  }
+
+  /**
+   * Update flowConfig locally and trigger all listeners iff @param 
triggerListener is set to true
+   */
+  public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig, 
boolean triggerListener) {
+    log.info("[GAAS-REST] Update called with flowGroup {} flowName {}", 
flowId.getFlowGroup(), flowId.getFlowName());
+
+    if (!flowId.getFlowGroup().equals(flowConfig.getId().getFlowGroup()) || 
!flowId.getFlowName().equals(flowConfig.getId().getFlowName())) {
+      throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST,
+          "flowName and flowGroup cannot be changed in update", null);
+    }
+
+    this.flowCatalog.put(createFlowSpecForConfig(flowConfig), triggerListener);
+    return new UpdateResponse(HttpStatus.S_200_OK);
+  }
+
+  /**
+   * Update flowConfig locally and trigger all listeners
+   */
+  public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig) 
throws FlowConfigLoggedException {
+    return updateFlowConfig(flowId, flowConfig, true);
+  }
+
+  /**
+   * Delete flowConfig locally and trigger all listeners iff @param 
triggerListener is set to true
+   */
+  public UpdateResponse deleteFlowConfig(FlowId flowId, Properties header, 
boolean triggerListener) throws FlowConfigLoggedException {
+
+    log.info("[GAAS-REST] Delete called with flowGroup {} flowName {}", 
flowId.getFlowGroup(), flowId.getFlowName());
+    URI flowUri = null;
+
+    try {
+      flowUri = createFlowSpecUri(flowId);
+      this.flowCatalog.remove(flowUri, header, triggerListener);
+      return new UpdateResponse(HttpStatus.S_200_OK);
+    } catch (URISyntaxException e) {
+      throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "bad 
URI " + flowUri, e);
+    }
+  }
+
+  /**
+   * Delete flowConfig locally and trigger all listeners
+   */
+  public UpdateResponse deleteFlowConfig(FlowId flowId, Properties header)  
throws FlowConfigLoggedException {
+    return deleteFlowConfig(flowId, header, true);
+  }
+
+  public static URI createFlowSpecUri (FlowId flowId) throws 
URISyntaxException {
+    URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null);
+    URI flowUri = new URI(flowCatalogURI.getScheme(), 
flowCatalogURI.getAuthority(),
+        "/" + flowId.getFlowGroup() + "/" + flowId.getFlowName(), null, null);
+    return flowUri;
+  }
+
+  /**
+   * Build a {@link FlowSpec} from a {@link FlowConfig}
+   * @param flowConfig flow configuration
+   * @return {@link FlowSpec} created with attributes from flowConfig
+   */
+  public static FlowSpec createFlowSpecForConfig(FlowConfig flowConfig) {
+    ConfigBuilder configBuilder = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, 
flowConfig.getId().getFlowGroup())
+        .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, 
flowConfig.getId().getFlowName());
+
+    if (flowConfig.hasSchedule()) {
+      Schedule schedule = flowConfig.getSchedule();
+      configBuilder.addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, 
schedule.getCronSchedule());
+      configBuilder.addPrimitive(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, 
schedule.isRunImmediately());
+    }
+
+    Config config = configBuilder.build();
+    Config configWithFallback = 
config.withFallback(ConfigFactory.parseMap(flowConfig.getProperties()));
+
+    try {
+      URI templateURI = new URI(flowConfig.getTemplateUris());
+      return 
FlowSpec.builder().withConfig(configWithFallback).withTemplate(templateURI).build();
+    } catch (URISyntaxException e) {
+      throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "bad 
URI " + flowConfig.getTemplateUris(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
index 9074a43..8c91ca7 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
@@ -17,38 +17,22 @@
 
 package org.apache.gobblin.service;
 
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
+import com.google.common.collect.ImmutableSet;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
-import com.linkedin.data.template.StringMap;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
-import com.linkedin.restli.common.HttpStatus;
 import com.linkedin.restli.server.CreateResponse;
-import com.linkedin.restli.server.RestLiServiceException;
 import com.linkedin.restli.server.UpdateResponse;
 import com.linkedin.restli.server.annotations.RestLiCollection;
 import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.apache.gobblin.config.ConfigBuilder;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
-
-import com.google.common.collect.ImmutableSet;
 
 
 /**
@@ -61,33 +45,18 @@ public class FlowConfigsResource extends 
ComplexKeyResourceTemplate<FlowId, Empt
 
 
   @edu.umd.cs.findbugs.annotations.SuppressWarnings("MS_SHOULD_BE_FINAL")
-  public static FlowCatalog _globalFlowCatalog;
+  public static FlowConfigsResourceHandler global_flowConfigsResourceHandler = 
null;
 
   @Inject
-  @Named("flowCatalog")
-  private FlowCatalog _flowCatalog;
+  @Named("flowConfigsResourceHandler")
+  private FlowConfigsResourceHandler flowConfigsResourceHandler;
 
   // For blocking use of this resource until it is ready
   @Inject
   @Named("readyToUse")
   private Boolean readyToUse = Boolean.FALSE;
 
-  public FlowConfigsResource() {}
-
-  /**
-   * Logs message and throws Rest.li exception
-   * @param status HTTP status code
-   * @param msg error message
-   * @param e exception
-   */
-  public void logAndThrowRestLiServiceException(HttpStatus status, String msg, 
Exception e) {
-    if (e != null) {
-      LOG.error(msg, e);
-      throw new RestLiServiceException(status, msg + " cause = " + 
e.getMessage());
-    } else {
-      LOG.error(msg);
-      throw new RestLiServiceException(status, msg);
-    }
+  public FlowConfigsResource() {
   }
 
   /**
@@ -99,82 +68,8 @@ public class FlowConfigsResource extends 
ComplexKeyResourceTemplate<FlowId, Empt
   public FlowConfig get(ComplexResourceKey<FlowId, EmptyRecord> key) {
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
-
-    LOG.info("Get called with flowGroup " + flowGroup + " flowName " + 
flowName);
-
-    try {
-      URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null);
-      URI flowUri = new URI(flowCatalogURI.getScheme(), 
flowCatalogURI.getAuthority(),
-          "/" + flowGroup + "/" + flowName, null, null);
-      FlowSpec spec = (FlowSpec) getFlowCatalog().getSpec(flowUri);
-      FlowConfig flowConfig = new FlowConfig();
-      Properties flowProps = spec.getConfigAsProperties();
-      Schedule schedule = null;
-
-      if (flowProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
-        schedule = new Schedule();
-        
schedule.setCronSchedule(flowProps.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
-      }
-      if (flowProps.containsKey(ConfigurationKeys.JOB_TEMPLATE_PATH)) {
-        
flowConfig.setTemplateUris(flowProps.getProperty(ConfigurationKeys.JOB_TEMPLATE_PATH));
-      } else if (spec.getTemplateURIs().isPresent()) {
-        
flowConfig.setTemplateUris(StringUtils.join(spec.getTemplateURIs().get(), ","));
-      } else {
-        flowConfig.setTemplateUris("NA");
-      }
-      if (schedule != null) {
-        if (flowProps.containsKey(ConfigurationKeys.FLOW_RUN_IMMEDIATELY)) {
-          
schedule.setRunImmediately(Boolean.valueOf(flowProps.getProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY)));
-        }
-
-        flowConfig.setSchedule(schedule);
-      }
-
-      // remove keys that were injected as part of flowSpec creation
-      flowProps.remove(ConfigurationKeys.JOB_SCHEDULE_KEY);
-      flowProps.remove(ConfigurationKeys.JOB_TEMPLATE_PATH);
-
-      StringMap flowPropsAsStringMap = new StringMap();
-      flowPropsAsStringMap.putAll(Maps.fromProperties(flowProps));
-
-      return flowConfig.setId(new 
FlowId().setFlowGroup(flowGroup).setFlowName(flowName))
-          .setProperties(flowPropsAsStringMap);
-    } catch (URISyntaxException e) {
-      logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI 
" + flowName, e);
-    } catch (SpecNotFoundException e) {
-      logAndThrowRestLiServiceException(HttpStatus.S_404_NOT_FOUND, "Flow 
requested does not exist: " + flowName, null);
-    }
-
-    return null;
-  }
-
-  /**
-   * Build a {@link FlowSpec} from a {@link FlowConfig}
-   * @param flowConfig flow configuration
-   * @return {@link FlowSpec} created with attributes from flowConfig
-   */
-  private FlowSpec createFlowSpecForConfig(FlowConfig flowConfig) {
-    ConfigBuilder configBuilder = ConfigBuilder.create()
-        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, 
flowConfig.getId().getFlowGroup())
-        .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, 
flowConfig.getId().getFlowName());
-
-    if (flowConfig.hasSchedule()) {
-      Schedule schedule = flowConfig.getSchedule();
-      configBuilder.addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, 
schedule.getCronSchedule());
-      configBuilder.addPrimitive(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, 
schedule.isRunImmediately());
-    }
-
-    Config config = configBuilder.build();
-    Config configWithFallback = 
config.withFallback(ConfigFactory.parseMap(flowConfig.getProperties()));
-
-    URI templateURI = null;
-    try {
-      templateURI = new URI(flowConfig.getTemplateUris());
-    } catch (URISyntaxException e) {
-      logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI 
" + flowConfig.getTemplateUris(), e);
-    }
-
-    return 
FlowSpec.builder().withConfig(configWithFallback).withTemplate(templateURI).build();
+    FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+    return this.getFlowConfigResourceHandler().getFlowConfig(flowId);
   }
 
   /**
@@ -184,18 +79,7 @@ public class FlowConfigsResource extends 
ComplexKeyResourceTemplate<FlowId, Empt
    */
   @Override
   public CreateResponse create(FlowConfig flowConfig) {
-    LOG.info("Create called with flowName " + 
flowConfig.getId().getFlowName());
-
-    LOG.debug("ReadyToUse is: " + readyToUse);
-    LOG.debug("FlowCatalog is: " + getFlowCatalog());
-
-    if (!readyToUse && getFlowCatalog() == null) {
-      throw new RuntimeException("Not ready for use.");
-    }
-
-    getFlowCatalog().put(createFlowSpecForConfig(flowConfig));
-
-    return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new 
EmptyRecord()), HttpStatus.S_201_CREATED);
+    return this.getFlowConfigResourceHandler().createFlowConfig(flowConfig);
   }
 
   /**
@@ -209,46 +93,29 @@ public class FlowConfigsResource extends 
ComplexKeyResourceTemplate<FlowId, Empt
   public UpdateResponse update(ComplexResourceKey<FlowId, EmptyRecord> key, 
FlowConfig flowConfig) {
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
-
-    LOG.info("Update called with flowGroup " + flowGroup + " flowName " + 
flowName);
-
-    if (!flowGroup.equals(flowConfig.getId().getFlowGroup()) || 
!flowName.equals(flowConfig.getId().getFlowName())) {
-      logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST,
-          "flowName and flowGroup cannot be changed in update", null);
-    }
-
-      getFlowCatalog().put(createFlowSpecForConfig(flowConfig));
-
-      return new UpdateResponse(HttpStatus.S_200_OK);
+    FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+    return this.getFlowConfigResourceHandler().updateFlowConfig(flowId, 
flowConfig);
   }
 
   /**
    * Delete a configured flow. Running flows are not affected. The schedule 
will be removed for scheduled flows.
-   * @param key composite key containing flow group and flow name that 
identifies the flow to remove from the
-   * {@link FlowCatalog}
+   * @param key composite key containing flow group and flow name that 
identifies the flow to remove from the flow catalog
    * @return {@link UpdateResponse}
    */
   @Override
   public UpdateResponse delete(ComplexResourceKey<FlowId, EmptyRecord> key) {
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
-    URI flowUri = null;
-
-    LOG.info("Delete called with flowGroup " + flowGroup + " flowName " + 
flowName);
-
-    try {
-      URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null);
-      flowUri = new URI(flowCatalogURI.getScheme(), 
flowCatalogURI.getAuthority(),
-          "/" + flowGroup + "/" + flowName, null, null);
-
-      getFlowCatalog().remove(flowUri, getHeaders());
+    FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+    return this.getFlowConfigResourceHandler().deleteFlowConfig(flowId, 
getHeaders());
+  }
 
-      return new UpdateResponse(HttpStatus.S_200_OK);
-    } catch (URISyntaxException e) {
-      logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI 
" + flowUri, e);
+  private FlowConfigsResourceHandler getFlowConfigResourceHandler() {
+    if (global_flowConfigsResourceHandler != null) {
+      return global_flowConfigsResourceHandler;
     }
 
-    return null;
+    return flowConfigsResourceHandler;
   }
 
   private Properties getHeaders() {
@@ -260,18 +127,5 @@ public class FlowConfigsResource extends 
ComplexKeyResourceTemplate<FlowId, Empt
     }
     return headerProperties;
   }
-
-  /***
-   * This method is to workaround injection issues where Service has only one 
active global FlowCatalog
-   * .. and is not able to inject it via RestLI bootstrap. We should remove 
this and make injected
-   * .. FlowCatalog standard after injection works and recipe is documented 
here.
-   * @return FlowCatalog in use.
-   */
-  private FlowCatalog getFlowCatalog() {
-    if (null != _globalFlowCatalog) {
-      return _globalFlowCatalog;
-    }
-    return this._flowCatalog;
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
new file mode 100644
index 0000000..b92ab9d
--- /dev/null
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.util.Properties;
+
+import com.linkedin.restli.server.CreateResponse;
+import com.linkedin.restli.server.UpdateResponse;
+
+
+public interface FlowConfigsResourceHandler {
+  /**
+   * Get flow config
+   */
+  FlowConfig getFlowConfig(FlowId flowId) throws FlowConfigLoggedException;
+
+  /**
+   * Add flow config can be done locally only iff current node is a master
+   */
+  CreateResponse createFlowConfig(FlowConfig flowConfig) throws 
FlowConfigLoggedException;
+
+  /**
+   * Update flow config can be done locally only iff current node is a master
+   */
+  UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig) throws 
FlowConfigLoggedException;
+
+  /**
+   * Delete flow config can be done locally only iff current node is a master
+   */
+  UpdateResponse deleteFlowConfig(FlowId flowId, Properties header) throws 
FlowConfigLoggedException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index f9ae420..3422d66 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -25,25 +25,25 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
-import javax.annotation.Nonnull;
-
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
 
+import javax.annotation.Nonnull;
+
+import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
-import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
 import org.apache.gobblin.runtime.api.MutableSpecCatalog;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecCatalog;
@@ -91,8 +91,7 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
       this.metricContext = 
realParentCtx.childBuilder(FlowCatalog.class.getSimpleName()).build();
       this.metrics = new MutableStandardMetrics(this, Optional.of(config));
       this.addListener(this.metrics);
-    }
-    else {
+    } else {
       this.metricContext = null;
       this.metrics = null;
     }
@@ -240,8 +239,7 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
     }
   }
 
-  @Override
-  public void put(Spec spec) {
+  public void put(Spec spec, boolean triggerListener) {
     try {
       Preconditions.checkState(state() == State.RUNNING, String.format("%s is 
not running.", this.getClass().getName()));
       Preconditions.checkNotNull(spec);
@@ -251,18 +249,29 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
           ((FlowSpec) spec).getConfigAsProperties()));
       specStore.addSpec(spec);
       metrics.updatePutSpecTime(startTime);
-      this.listeners.onAddSpec(spec);
+      if (triggerListener) {
+        this.listeners.onAddSpec(spec);
+      }
     } catch (IOException e) {
       throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
     }
   }
 
+  @Override
+  public void put(Spec spec) {
+    put(spec, true);
+  }
+
   public void remove(URI uri) {
     remove(uri, new Properties());
   }
 
   @Override
   public void remove(URI uri, Properties headers) {
+    this.remove(uri, headers, true);
+  }
+
+  public void remove(URI uri, Properties headers, boolean triggerListener) {
     try {
       Preconditions.checkState(state() == State.RUNNING, String.format("%s is 
not running.", this.getClass().getName()));
       Preconditions.checkNotNull(uri);
@@ -270,8 +279,9 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
       log.info(String.format("Removing FlowSpec with URI: %s", uri));
       specStore.deleteSpec(uri);
       this.metrics.updateRemoveSpecTime(startTime);
-      this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION, 
headers);
-
+      if (triggerListener) {
+        this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION, 
headers);
+      }
     } catch (IOException e) {
       throw new RuntimeException("Cannot delete Spec from Spec store for URI: 
" + uri, e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
new file mode 100644
index 0000000..d02d861
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.core;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.FlowConfig;
+import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.restli.FlowConfigUtils;
+import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+
+/**
+ * A custom {@link MessageHandlerFactory} for {@link 
org.apache.gobblin.service.modules.core.ControllerUserDefinedMessageHandlerFactory}s
 that
+ * handle messages of type {@link 
org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
+ */
+@AllArgsConstructor
+class ControllerUserDefinedMessageHandlerFactory implements 
MessageHandlerFactory {
+  private boolean flowCatalogLocalCommit;
+  private GobblinServiceJobScheduler jobScheduler;
+  private GobblinServiceFlowConfigResourceHandler resourceHandler;
+  private String serviceName;
+
+  @Override
+  public MessageHandler createHandler(Message message, NotificationContext 
context) {
+    return new ControllerUserDefinedMessageHandler(message, context, 
serviceName, flowCatalogLocalCommit, jobScheduler, resourceHandler);
+  }
+
+  @Override
+  public String getMessageType() {
+    return Message.MessageType.USER_DEFINE_MSG.toString();
+  }
+
+  public List<String> getMessageTypes() {
+    return Collections.singletonList(getMessageType());
+  }
+
+  @Override
+  public void reset() {
+
+  }
+
+  /**
+   * A custom {@link MessageHandler} for handling user-defined messages to the 
controller.
+   */
+  @Slf4j
+  private static class ControllerUserDefinedMessageHandler extends 
MessageHandler {
+    private boolean flowCatalogLocalCommit;
+    private GobblinServiceJobScheduler jobScheduler;
+    private GobblinServiceFlowConfigResourceHandler resourceHandler;
+    private String serviceName;
+
+    public ControllerUserDefinedMessageHandler(Message message, 
NotificationContext context, String serviceName,
+        boolean flowCatalogLocalCommit, GobblinServiceJobScheduler scheduler,
+        GobblinServiceFlowConfigResourceHandler resourceHandler) {
+      super(message, context);
+      this.serviceName = serviceName;
+      this.flowCatalogLocalCommit = flowCatalogLocalCommit;
+      this.jobScheduler = scheduler;
+      this.resourceHandler = resourceHandler;
+    }
+
+    /**
+     * Method to handle add flow config message forwarded by Helix (Standby) 
node.
+     * In load balance mode, the FlowCatalog I/O was handled on standby when 
receiving Restli, so only need to handle
+     * {@link 
org.apache.gobblin.runtime.api.SpecCatalogListener#onAddSpec(Spec)} part.
+     * Otherwise, we have to handle both FlowCatalog I/O and {@link 
org.apache.gobblin.runtime.api.SpecCatalogListener#onAddSpec(Spec)}.
+     *
+     * Please refer to {@link 
FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)}. It will handle 
both FlowCatalog I/O and
+     * {@link 
org.apache.gobblin.runtime.api.SpecCatalogListener#onAddSpec(Spec)} in 
non-balance mode.
+     */
+    private void handleAdd(String msg)
+        throws IOException {
+      FlowConfig config = FlowConfigUtils.deserializeFlowConfig(msg);
+      if (this.flowCatalogLocalCommit) {
+        // in balance mode, flow spec is already added in flow catalog on 
standby node.
+        FlowSpec flowSpec = 
FlowConfigResourceLocalHandler.createFlowSpecForConfig(config);
+        log.info("Only handle add {} scheduling because flow catalog is 
committed locally on standby.", flowSpec);
+        jobScheduler.onAddSpec(flowSpec);
+      } else {
+        resourceHandler.createFlowConfig(config);
+      }
+    }
+
+    /**
+     * Method to handle add flow config message forwarded by Helix (Standby) 
node.
+     * In load balance mode, the FlowCatalog I/O was handled on standby when 
receiving Restli, so only need to handle
+     * {@link 
org.apache.gobblin.runtime.api.SpecCatalogListener#onUpdateSpec(Spec)} part.
+     * Otherwise, we have to handle both FlowCatalog I/O and {@link 
org.apache.gobblin.runtime.api.SpecCatalogListener#onUpdateSpec(Spec)}.
+     *
+     * Please refer to {@link 
FlowConfigResourceLocalHandler#updateFlowConfig(FlowId, FlowConfig)}. It will 
handle both FlowCatalog I/O and
+     * {@link 
org.apache.gobblin.runtime.api.SpecCatalogListener#onUpdateSpec(Spec)} in 
non-balance mode.
+     */
+    private void handleUpdate(String msg)
+        throws IOException {
+      FlowConfig config = FlowConfigUtils.deserializeFlowConfig(msg);
+      if (flowCatalogLocalCommit) {
+        // in balance mode, flow spec is already updated in flow catalog on 
standby node.
+        FlowSpec flowSpec = 
FlowConfigResourceLocalHandler.createFlowSpecForConfig(config);
+        log.info("Only handle update {} scheduling because flow catalog is 
committed locally on standby.", flowSpec);
+        jobScheduler.onUpdateSpec(flowSpec);
+      } else {
+        resourceHandler.updateFlowConfig(config.getId(), config);
+      }
+    }
+
+    /**
+     * Method to handle add flow config message forwarded by Helix (Standby) 
node.
+     * In load balance mode, the FlowCatalog I/O was handled on standby when 
receiving Restli, so only need to handle
+     * {@link 
org.apache.gobblin.runtime.api.SpecCatalogListener#onDeleteSpec(URI, String, 
Properties)} part.
+     * Otherwise, we have to handle both FlowCatalog I/O and {@link 
org.apache.gobblin.runtime.api.SpecCatalogListener#onDeleteSpec(URI, String, 
Properties)}.
+     *
+     * Please refer to {@link 
FlowConfigResourceLocalHandler#deleteFlowConfig(FlowId, Properties)}. It will 
handle both FlowCatalog I/O and
+     * {@link 
org.apache.gobblin.runtime.api.SpecCatalogListener#onDeleteSpec(URI, String, 
Properties)} in non-balance mode.
+     */
+    private void handleDelete(String msg)
+        throws IOException {
+      try {
+        FlowId id = FlowConfigUtils.deserializeFlowId(msg);
+        if (flowCatalogLocalCommit) {
+          // in balance mode, flow spec is already deleted in flow catalog on 
standby node.
+          URI flowUri = FlowConfigResourceLocalHandler.createFlowSpecUri(id);
+          log.info("Only handle update {} scheduling because flow catalog is 
committed locally on standby.", flowUri);
+          jobScheduler.onDeleteSpec(flowUri, FlowSpec.Builder.DEFAULT_VERSION);
+        } else {
+          resourceHandler.deleteFlowConfig(id, new Properties());
+        }
+      } catch (URISyntaxException e) {
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public HelixTaskResult handleMessage()
+        throws InterruptedException {
+      if (jobScheduler.isActive()) {
+        // we want to make sure current node is in active state
+        String msg = _message.getAttribute(Message.Attributes.INNER_MESSAGE);
+        log.info("{} ControllerUserDefinedMessage received : {}, type {}", 
this.serviceName, msg, _message.getMsgSubType());
+        try {
+          if 
(_message.getMsgSubType().equals(ServiceConfigKeys.HELIX_FLOWSPEC_ADD)) {
+            handleAdd(msg);
+          } else if 
(_message.getMsgSubType().equals(ServiceConfigKeys.HELIX_FLOWSPEC_REMOVE)) {
+            handleDelete(msg);
+          } else if 
(_message.getMsgSubType().equals(ServiceConfigKeys.HELIX_FLOWSPEC_UPDATE)) {
+            handleUpdate(msg);
+          }
+        } catch (IOException e) {
+          log.error("Cannot process Helix message.", e);
+          HelixTaskResult helixTaskResult = new HelixTaskResult();
+          helixTaskResult.setSuccess(false);
+          return helixTaskResult;
+        }
+      } else {
+        String msg = _message.getAttribute(Message.Attributes.INNER_MESSAGE);
+        log.error("ControllerUserDefinedMessage received but ignored due to 
not in active mode: {}, type {}", msg,
+            _message.getMsgSubType());
+      }
+      HelixTaskResult helixTaskResult = new HelixTaskResult();
+      helixTaskResult.setSuccess(true);
+
+      return helixTaskResult;
+    }
+
+    @Override
+    public void onError(Exception e, ErrorCode code, ErrorType type) {
+      log.error(
+          String.format("Failed to handle message with exception %s, error 
code %s, error type %s", e, code, type));
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
index b20e3b7..09d7bb4 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
@@ -25,8 +25,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.runtime.spec_store.FSSpecStore;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -50,16 +48,17 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.runtime.spec_store.FSSpecStore;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.PullFileLoader;
 
-import lombok.extern.slf4j.Slf4j;
-
 
 /**
  * Service that monitors for jobs from a git repository.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 3137c21..0daea79 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -17,29 +17,14 @@
 
 package org.apache.gobblin.service.modules.core;
 
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.instrumented.StandardMetricsBridge;
-import org.apache.gobblin.metrics.ContextAwareHistogram;
-import org.apache.gobblin.metrics.ContextAwareMetric;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.service.FlowId;
-import org.apache.gobblin.service.Schedule;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
-import javax.annotation.Nonnull;
-import lombok.Getter;
-
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -52,21 +37,16 @@ import org.apache.hadoop.fs.Path;
 import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
-import org.apache.helix.messaging.handling.HelixTaskResult;
-import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import com.google.common.base.Splitter;
 import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.eventbus.EventBus;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
 import com.google.inject.Binder;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -78,32 +58,42 @@ import com.linkedin.restli.server.resources.BaseResource;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import javax.annotation.Nonnull;
+import lombok.Getter;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.ContextAwareHistogram;
+import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.restli.EmbeddedRestliServer;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.app.ApplicationException;
 import org.apache.gobblin.runtime.app.ApplicationLauncher;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.service.FlowConfig;
 import org.apache.gobblin.service.FlowConfigClient;
+import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
 import org.apache.gobblin.service.FlowConfigsResource;
-import org.apache.gobblin.service.modules.utils.HelixUtils;
+import org.apache.gobblin.service.FlowConfigsResourceHandler;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.Schedule;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
-import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
+import org.apache.gobblin.service.modules.utils.HelixUtils;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 
 
 @Alpha
-public class GobblinServiceManager implements ApplicationLauncher, 
StandardMetricsBridge{
+public class GobblinServiceManager implements ApplicationLauncher, 
StandardMetricsBridge {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinServiceManager.class);
 
@@ -115,7 +105,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
 
   protected final FileSystem fs;
   protected final Path serviceWorkDir;
-
+  protected final String serviceName;
   protected final String serviceId;
 
   protected final boolean isTopologyCatalogEnabled;
@@ -130,6 +120,10 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
   protected FlowCatalog flowCatalog;
   @Getter
   protected GobblinServiceJobScheduler scheduler;
+  @Getter
+  protected GobblinServiceFlowConfigResourceHandler resourceHandler;
+
+  protected boolean flowCatalogLocalCommit;
   protected Orchestrator orchestrator;
   protected EmbeddedRestliServer restliServer;
   protected TopologySpecFactory topologySpecFactory;
@@ -142,7 +136,6 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
 
   @Getter
   protected Config config;
-
   private final MetricContext metricContext;
   private final Metrics metrics;
 
@@ -157,6 +150,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
     this.config = config;
     this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
this.getClass());
     this.metrics = new Metrics(this.metricContext, this.config);
+    this.serviceName = serviceName;
     this.serviceId = serviceId;
     this.serviceLauncher = new ServiceBasedAppLauncher(properties, 
serviceName);
 
@@ -177,6 +171,8 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
         ServiceConfigKeys.GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY, true);
     if (isFlowCatalogEnabled) {
       this.flowCatalog = new FlowCatalog(config, Optional.of(LOGGER));
+      this.flowCatalogLocalCommit = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT,
+          ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT);
       this.serviceLauncher.addService(flowCatalog);
 
       this.isGitConfigMonitorEnabled = ConfigUtils.getBoolean(config,
@@ -208,7 +204,8 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
     if (isSchedulerEnabled) {
       this.orchestrator = new Orchestrator(config, 
Optional.of(this.topologyCatalog), Optional.of(LOGGER));
       SchedulerService schedulerService = new SchedulerService(properties);
-      this.scheduler = new GobblinServiceJobScheduler(config, 
this.helixManager,
+
+      this.scheduler = new GobblinServiceJobScheduler(this.serviceName, 
config, this.helixManager,
           Optional.of(this.flowCatalog), Optional.of(this.topologyCatalog), 
this.orchestrator,
           schedulerService, Optional.of(LOGGER));
       this.serviceLauncher.addService(schedulerService);
@@ -216,13 +213,19 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
     }
 
     // Initialize RestLI
+    this.resourceHandler = new 
GobblinServiceFlowConfigResourceHandler(serviceName,
+        this.flowCatalogLocalCommit,
+        new FlowConfigResourceLocalHandler(this.flowCatalog),
+        this.helixManager,
+        this.scheduler);
+
     this.isRestLIServerEnabled = ConfigUtils.getBoolean(config,
         ServiceConfigKeys.GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY, true);
     if (isRestLIServerEnabled) {
       Injector injector = Guice.createInjector(new Module() {
         @Override
         public void configure(Binder binder) {
-          
binder.bind(FlowCatalog.class).annotatedWith(Names.named("flowCatalog")).toInstance(flowCatalog);
+          
binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsResourceHandler")).toInstance(GobblinServiceManager.this.resourceHandler);
           
binder.bindConstant().annotatedWith(Names.named("readyToUse")).to(Boolean.TRUE);
         }
       });
@@ -259,6 +262,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
     }
   }
 
+  @VisibleForTesting
   public boolean isLeader() {
     // If helix manager is absent, then this standalone instance hence leader
     // .. else check if this master of cluster
@@ -411,7 +415,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
         this.helixManager.get()
             .getMessagingService()
             
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
-                getUserDefinedMessageHandlerFactory());
+                new 
ControllerUserDefinedMessageHandlerFactory(flowCatalogLocalCommit, scheduler, 
resourceHandler, serviceName));
       }
     } catch (Exception e) {
       LOGGER.error("HelixManager failed to connect", e);
@@ -419,16 +423,6 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
     }
   }
 
-  /**
-   * Creates and returns a {@link MessageHandlerFactory} for handling of Helix
-   * {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}s.
-   *
-   * @returns a {@link MessageHandlerFactory}.
-   */
-  protected MessageHandlerFactory getUserDefinedMessageHandlerFactory() {
-    return new ControllerUserDefinedMessageHandlerFactory(this.flowCatalog, 
this.scheduler);
-  }
-
   @VisibleForTesting
   void disconnectHelixManager() {
     if (isHelixManagerConnected()) {
@@ -475,92 +469,6 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
     }
   }
 
-  /**
-   * A custom {@link MessageHandlerFactory} for {@link 
ControllerUserDefinedMessageHandler}s that
-   * handle messages of type {@link 
org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
-   */
-  private static class ControllerUserDefinedMessageHandlerFactory implements 
MessageHandlerFactory {
-
-    private FlowCatalog flowCatalog;
-    private GobblinServiceJobScheduler jobScheduler;
-
-    public ControllerUserDefinedMessageHandlerFactory(FlowCatalog flowCatalog, 
GobblinServiceJobScheduler jobScheduler) {
-      this.flowCatalog = flowCatalog;
-      this.jobScheduler = jobScheduler;
-    }
-
-    @Override
-    public MessageHandler createHandler(Message message, NotificationContext 
context) {
-      return new ControllerUserDefinedMessageHandler(flowCatalog, 
jobScheduler, message, context);
-    }
-
-    @Override
-    public String getMessageType() {
-      return Message.MessageType.USER_DEFINE_MSG.toString();
-    }
-
-    public List<String> getMessageTypes() {
-      return Collections.singletonList(getMessageType());
-    }
-
-    @Override
-    public void reset() {
-
-    }
-
-    /**
-     * A custom {@link MessageHandler} for handling user-defined messages to 
the controller.
-     */
-    private static class ControllerUserDefinedMessageHandler extends 
MessageHandler {
-
-      private FlowCatalog flowCatalog;
-      private GobblinServiceJobScheduler jobScheduler;
-
-      public ControllerUserDefinedMessageHandler(FlowCatalog flowCatalog, 
GobblinServiceJobScheduler jobScheduler,
-          Message message, NotificationContext context) {
-        super(message, context);
-
-        this.flowCatalog = flowCatalog;
-        this.jobScheduler = jobScheduler;
-      }
-
-      @Override
-      public HelixTaskResult handleMessage() throws InterruptedException {
-        if (jobScheduler.isActive()) {
-          String flowSpecUri = 
_message.getAttribute(Message.Attributes.INNER_MESSAGE);
-          LOGGER.info ("ControllerUserDefinedMessage received : {}, type {}", 
flowSpecUri, _message.getMsgSubType());
-          try {
-            if 
(_message.getMsgSubType().equals(ServiceConfigKeys.HELIX_FLOWSPEC_ADD)) {
-              Spec spec = flowCatalog.getSpec(new URI(flowSpecUri));
-              this.jobScheduler.onAddSpec(spec);
-            } else if 
(_message.getMsgSubType().equals(ServiceConfigKeys.HELIX_FLOWSPEC_REMOVE)) {
-              List<String> flowSpecUriParts = 
Splitter.on(":").omitEmptyStrings().trimResults().splitToList(flowSpecUri);
-              this.jobScheduler.onDeleteSpec(new URI(flowSpecUriParts.get(0)), 
flowSpecUriParts.get(1));
-            } else if 
(_message.getMsgSubType().equals(ServiceConfigKeys.HELIX_FLOWSPEC_UPDATE)) {
-              Spec spec = flowCatalog.getSpec(new URI(flowSpecUri));
-              this.jobScheduler.onUpdateSpec(spec);
-            }
-          } catch (SpecNotFoundException | URISyntaxException e) {
-            LOGGER.error("Cannot process Helix message for flowSpecUri: " + 
flowSpecUri, e);
-          }
-        } else {
-          String flowSpecUri = 
_message.getAttribute(Message.Attributes.INNER_MESSAGE);
-          LOGGER.info ("ControllerUserDefinedMessage received but ignored due 
to not in active mode: {}, type {}", flowSpecUri, _message.getMsgSubType());
-        }
-        HelixTaskResult helixTaskResult = new HelixTaskResult();
-        helixTaskResult.setSuccess(true);
-
-        return helixTaskResult;
-      }
-
-      @Override
-      public void onError(Exception e, ErrorCode code, ErrorType type) {
-        LOGGER.error(
-            String.format("Failed to handle message with exception %s, error 
code %s, error type %s", e, code, type));
-      }
-    }
-  }
-
   private static String getServiceId() {
     return "1";
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java
new file mode 100644
index 0000000..a59fc85
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.restli;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import com.google.common.collect.Maps;
+import com.linkedin.data.template.StringMap;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.service.FlowConfig;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.Schedule;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+public class FlowConfigUtils {
+  private static final String FLOWCONFIG = "fc";
+  private static final String FLOWCONFIG_ID = FLOWCONFIG + '-' + "id";
+  private static final String FLOWCONFIG_ID_NAME = FLOWCONFIG_ID + '-' + 
"name";
+  private static final String FLOWCONFIG_ID_GROUP = FLOWCONFIG_ID + '-' + 
"group";
+
+  private static final String FLOWCONFIG_SCHEDULE = FLOWCONFIG + '-' + "sch";
+  private static final String FLOWCONFIG_SCHEDULE_CRON = FLOWCONFIG_SCHEDULE + 
'-' + "cron";
+  private static final String FLOWCONFIG_SCHEDULE_RUN_IMMEDIATELY = 
FLOWCONFIG_SCHEDULE + '-' + "runImmediately";
+
+  private static final String FLOWCONFIG_TEMPLATEURIS = FLOWCONFIG + '-' + 
"templateUris";
+
+  public static String serializeFlowId(FlowId id) throws IOException {
+    Properties properties = new Properties();
+    properties.setProperty(FLOWCONFIG_ID_NAME, id.getFlowName());
+    properties.setProperty(FLOWCONFIG_ID_GROUP, id.getFlowGroup());
+
+    return PropertiesUtils.serialize(properties);
+  }
+
+  public static FlowId deserializeFlowId(String serialized) throws IOException 
{
+    Properties properties = PropertiesUtils.deserialize(serialized);
+    FlowId id =  new FlowId();
+    id.setFlowName(properties.getProperty(FLOWCONFIG_ID_NAME));
+    id.setFlowGroup(properties.getProperty(FLOWCONFIG_ID_GROUP));
+    return id;
+  }
+
+  public static String serializeFlowConfig(FlowConfig flowConfig) throws 
IOException {
+    Properties properties = 
ConfigUtils.configToProperties(ConfigFactory.parseMap(flowConfig.getProperties()));
+    properties.setProperty(FLOWCONFIG_ID_NAME, 
flowConfig.getId().getFlowName());
+    properties.setProperty(FLOWCONFIG_ID_GROUP, 
flowConfig.getId().getFlowGroup());
+
+    if (flowConfig.hasSchedule()) {
+      properties.setProperty(FLOWCONFIG_SCHEDULE_CRON, 
flowConfig.getSchedule().getCronSchedule());
+      properties.setProperty(FLOWCONFIG_SCHEDULE_RUN_IMMEDIATELY, 
Boolean.toString(flowConfig.getSchedule().isRunImmediately()));
+    }
+
+    if (flowConfig.hasTemplateUris()) {
+      properties.setProperty(FLOWCONFIG_TEMPLATEURIS, 
flowConfig.getTemplateUris());
+    }
+
+    return PropertiesUtils.serialize(properties);
+  }
+
+  public static FlowConfig deserializeFlowConfig(String serialized) throws 
IOException {
+    Properties properties = PropertiesUtils.deserialize(serialized);
+    FlowConfig flowConfig = new FlowConfig().setId(new FlowId()
+        .setFlowName(properties.getProperty(FLOWCONFIG_ID_NAME))
+        .setFlowGroup(properties.getProperty(FLOWCONFIG_ID_GROUP)));
+
+    if (properties.containsKey(FLOWCONFIG_SCHEDULE_CRON)) {
+      flowConfig.setSchedule(new Schedule()
+          .setCronSchedule(properties.getProperty(FLOWCONFIG_SCHEDULE_CRON))
+          
.setRunImmediately(Boolean.valueOf(properties.getProperty(FLOWCONFIG_SCHEDULE_RUN_IMMEDIATELY))));
+    }
+
+    if (properties.containsKey(FLOWCONFIG_TEMPLATEURIS)) {
+      
flowConfig.setTemplateUris(properties.getProperty(FLOWCONFIG_TEMPLATEURIS));
+    }
+
+    properties.remove(FLOWCONFIG_ID_NAME);
+    properties.remove(FLOWCONFIG_ID_GROUP);
+    properties.remove(FLOWCONFIG_SCHEDULE_CRON);
+    properties.remove(FLOWCONFIG_SCHEDULE_RUN_IMMEDIATELY);
+    properties.remove(FLOWCONFIG_TEMPLATEURIS);
+
+    flowConfig.setProperties(new StringMap(Maps.fromProperties(properties)));
+
+    return flowConfig;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
new file mode 100644
index 0000000..55d9cf0
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.restli;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+
+import com.google.common.base.Optional;
+import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.server.CreateResponse;
+import com.linkedin.restli.server.UpdateResponse;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.service.FlowConfig;
+import org.apache.gobblin.service.FlowConfigLoggedException;
+import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
+import org.apache.gobblin.service.FlowConfigsResourceHandler;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+import org.apache.gobblin.service.modules.utils.HelixUtils;
+
+
+/**
+ * A high available flow config resource handler which consider the leadership 
change.
+ * When a non-master status detected, it will forward the rest-li request
+ * to the master node. Otherwise it will handle it locally.
+ */
+@Slf4j
+public class GobblinServiceFlowConfigResourceHandler implements 
FlowConfigsResourceHandler {
+  @Getter
+  private String serviceName;
+  private boolean flowCatalogLocalCommit;
+  private FlowConfigResourceLocalHandler localHandler;
+  private Optional<HelixManager> helixManager;
+  private GobblinServiceJobScheduler jobScheduler;
+
+  public GobblinServiceFlowConfigResourceHandler(String serviceName, boolean 
flowCatalogLocalCommit,
+      FlowConfigResourceLocalHandler handler,
+      Optional<HelixManager> manager,
+      GobblinServiceJobScheduler jobScheduler) {
+    this.flowCatalogLocalCommit = flowCatalogLocalCommit;
+    this.serviceName = serviceName;
+    this.localHandler = handler;
+    this.helixManager = manager;
+    this.jobScheduler = jobScheduler;
+  }
+
+  @Override
+  public FlowConfig getFlowConfig(FlowId flowId)
+      throws FlowConfigLoggedException {
+    return this.localHandler.getFlowConfig(flowId);
+  }
+
+  /**
+   * Method to handle create Restli request.
+   * In load balance mode, we will handle flowCatalog I/O locally before 
forwarding the message to Helix (Active) node.
+   * Please refer to {@link 
FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)}. It only handle 
flowCatalog I/O.
+   *
+   * The listeners of {@link 
org.apache.gobblin.runtime.spec_catalog.FlowCatalog} won't be triggered in 
balance mode.
+   */
+  @Override
+  public CreateResponse createFlowConfig(FlowConfig flowConfig)
+      throws FlowConfigLoggedException {
+    String flowName = flowConfig.getId().getFlowName();
+    String flowGroup = flowConfig.getId().getFlowGroup();
+
+    checkHelixConnection(ServiceConfigKeys.HELIX_FLOWSPEC_ADD, flowName, 
flowGroup);
+
+    try {
+      if (!jobScheduler.isActive() && helixManager.isPresent()) {
+        if (this.flowCatalogLocalCommit) {
+          // We will handle FS I/O locally for load balance before forwarding 
to remote node.
+          this.localHandler.createFlowConfig(flowConfig, false);
+        }
+
+        forwardMessage(ServiceConfigKeys.HELIX_FLOWSPEC_ADD, 
FlowConfigUtils.serializeFlowConfig(flowConfig), flowName, flowGroup);
+
+        // Do actual work on remote node, directly return success
+        return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), 
new EmptyRecord()), HttpStatus.S_201_CREATED);
+      } else {
+        return this.localHandler.createFlowConfig(flowConfig);
+      }
+    } catch (IOException e) {
+      throw new 
FlowConfigLoggedException(HttpStatus.S_500_INTERNAL_SERVER_ERROR,
+          "Cannot create flowConfig [flowName=" + flowName + " flowGroup=" + 
flowGroup + "]", e);
+    }
+  }
+
+  /**
+   * Method to handle update Restli request.
+   * In load balance mode, we will handle flowCatalog I/O locally before 
forwarding the message to Helix (Active) node.
+   * Please refer to {@link 
FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)}. It only handle 
flowCatalog I/O.
+   *
+   * The listeners of {@link 
org.apache.gobblin.runtime.spec_catalog.FlowCatalog} won't be triggered in 
balance mode.
+   */
+  @Override
+  public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig)
+      throws FlowConfigLoggedException {
+    String flowName = flowId.getFlowName();
+    String flowGroup = flowId.getFlowGroup();
+
+    if (!flowGroup.equals(flowConfig.getId().getFlowGroup()) || 
!flowName.equals(flowConfig.getId().getFlowName())) {
+      throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST,
+          "flowName and flowGroup cannot be changed in update", null);
+    }
+
+    checkHelixConnection(ServiceConfigKeys.HELIX_FLOWSPEC_UPDATE, flowName, 
flowGroup);
+
+    try {
+      if (!jobScheduler.isActive() && helixManager.isPresent()) {
+
+        if (this.flowCatalogLocalCommit) {
+          // We will handle FS I/O locally for load balance before forwarding 
to remote node.
+          this.localHandler.updateFlowConfig(flowId, flowConfig, false);
+        }
+
+        forwardMessage(ServiceConfigKeys.HELIX_FLOWSPEC_UPDATE, 
FlowConfigUtils.serializeFlowConfig(flowConfig), flowName, flowGroup);
+
+        // Do actual work on remote node, directly return success
+        log.info("Forwarding update flowConfig [flowName=" + flowName + " 
flowGroup=" + flowGroup + "]");
+        return new UpdateResponse(HttpStatus.S_200_OK);
+      } else {
+        return this.localHandler.updateFlowConfig(flowId, flowConfig);
+      }
+
+    } catch (IOException e) {
+      throw new 
FlowConfigLoggedException(HttpStatus.S_500_INTERNAL_SERVER_ERROR,
+          "Cannot update flowConfig [flowName=" + flowName + " flowGroup=" + 
flowGroup + "]", e);
+    }
+  }
+
+  /**
+   * Method to handle delete Restli request.
+   * In load balance mode, we will handle flowCatalog I/O locally before 
forwarding the message to Helix (Active) node.
+   * Please refer to {@link 
FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)}. It only handle 
flowCatalog I/O.
+   *
+   * The listeners of {@link 
org.apache.gobblin.runtime.spec_catalog.FlowCatalog} won't be triggered in 
balance mode.
+   */
+  @Override
+  public UpdateResponse deleteFlowConfig(FlowId flowId, Properties header)
+      throws FlowConfigLoggedException {
+    String flowName = flowId.getFlowName();
+    String flowGroup = flowId.getFlowGroup();
+
+    checkHelixConnection(ServiceConfigKeys.HELIX_FLOWSPEC_REMOVE, flowName, 
flowGroup);
+
+    try {
+      if (!jobScheduler.isActive() && helixManager.isPresent()) {
+
+        if (this.flowCatalogLocalCommit) {
+          // We will handle FS I/O locally for load balance before forwarding 
to remote node.
+          this.localHandler.deleteFlowConfig(flowId, header, false);
+        }
+
+        forwardMessage(ServiceConfigKeys.HELIX_FLOWSPEC_REMOVE, 
FlowConfigUtils.serializeFlowId(flowId), flowName, flowGroup);
+
+        return new UpdateResponse(HttpStatus.S_200_OK);
+      } else {
+        return this.localHandler.deleteFlowConfig(flowId, header);
+      }
+    } catch (IOException e) {
+      throw new 
FlowConfigLoggedException(HttpStatus.S_500_INTERNAL_SERVER_ERROR,
+          "Cannot delete flowConfig [flowName=" + flowName + " flowGroup=" + 
flowGroup + "]", e);
+    }
+  }
+
+  private void checkHelixConnection(String opr, String flowName, String 
flowGroup) throws FlowConfigLoggedException {
+    if (this.helixManager.isPresent() && 
!this.helixManager.get().isConnected()) {
+      // Specs in store will be notified when Scheduler is added as listener 
to FlowCatalog, so ignore
+      // .. Specs if in cluster mode and Helix is not yet initialized
+      log.warn("System not yet initialized. Skipping operation " + opr);
+      throw new 
FlowConfigLoggedException(HttpStatus.S_500_INTERNAL_SERVER_ERROR,
+          "System not yet initialized. Skipping " + opr + " flowConfig 
[flowName=" + flowName + " flowGroup=" + flowGroup + "]");
+    }
+  }
+
+  private void forwardMessage(String msgSubType, String val, String flowName, 
String flowGroup) {
+    HelixUtils.sendUserDefinedMessage(msgSubType, val, 
UUID.randomUUID().toString(), InstanceType.CONTROLLER,
+        helixManager.get(), log);
+    log.info("{} Forwarding {} flowConfig [flowName={} flowGroup={}", 
serviceName, msgSubType, flowName, flowGroup + "]");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 328c742..506a53e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -22,14 +22,8 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Properties;
 
-import java.util.UUID;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
 import org.quartz.DisallowConcurrentExecution;
 import org.quartz.InterruptableJob;
 import org.quartz.JobDataMap;
@@ -45,6 +39,9 @@ import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.JobException;
@@ -57,7 +54,6 @@ import 
org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.scheduler.BaseGobblinJob;
 import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
-import org.apache.gobblin.service.modules.utils.HelixUtils;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.util.ConfigUtils;
@@ -78,29 +74,29 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
   protected final Orchestrator orchestrator;
   @Getter
   protected final Map<String, Spec> scheduledFlowSpecs;
-
   @Getter
-  protected volatile boolean isActive;
+  private volatile boolean isActive;
+  private String serviceName;
 
-  public GobblinServiceJobScheduler(Config config, Optional<HelixManager> 
helixManager,
+  public GobblinServiceJobScheduler(String serviceName, Config config, 
Optional<HelixManager> helixManager,
       Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> 
topologyCatalog, Orchestrator orchestrator,
       SchedulerService schedulerService, Optional<Logger> log)
       throws Exception {
     super(ConfigUtils.configToProperties(config), schedulerService);
 
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
-
+    this.serviceName = serviceName;
     this.flowCatalog = flowCatalog;
     this.helixManager = helixManager;
     this.orchestrator = orchestrator;
     this.scheduledFlowSpecs = Maps.newHashMap();
   }
 
-  public GobblinServiceJobScheduler(Config config, Optional<HelixManager> 
helixManager,
+  public GobblinServiceJobScheduler(String serviceName, Config config, 
Optional<HelixManager> helixManager,
       Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> 
topologyCatalog, SchedulerService schedulerService,
       Optional<Logger> log)
       throws Exception {
-    this(config, helixManager, flowCatalog, topologyCatalog, new 
Orchestrator(config, topologyCatalog, log),
+    this(serviceName, config, helixManager, flowCatalog, topologyCatalog, new 
Orchestrator(config, topologyCatalog, log),
         schedulerService, log);
   }
 
@@ -196,13 +192,6 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
     _log.info("New Flow Spec detected: " + addedSpec);
 
     if (addedSpec instanceof FlowSpec) {
-      if (!isActive && helixManager.isPresent()) {
-        _log.info("Scheduler running in slave mode, forward Spec add via Helix 
message to master: " + addedSpec);
-        
HelixUtils.sendUserDefinedMessage(ServiceConfigKeys.HELIX_FLOWSPEC_ADD, 
addedSpec.getUri().toString(),
-            UUID.randomUUID().toString(), InstanceType.CONTROLLER, 
helixManager.get(), _log);
-        return;
-      }
-
       try {
         Properties jobConfig = new Properties();
         Properties flowSpecProperties = ((FlowSpec) 
addedSpec).getConfigAsProperties();
@@ -221,7 +210,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
         this.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);
 
         if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
-          _log.info("Scheduling flow spec: " + addedSpec);
+          _log.info("{} Scheduling flow spec: {} ", this.serviceName, 
addedSpec);
           scheduleJob(jobConfig, null);
           if (PropertiesUtils.getPropAsBoolean(jobConfig, 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
             _log.info("RunImmediately requested, hence executing FlowSpec: " + 
addedSpec);
@@ -232,7 +221,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
           this.jobExecutor.execute(new NonScheduledJobRunner(jobConfig, null));
         }
       } catch (JobException je) {
-        _log.error("Failed to schedule or run FlowSpec " + addedSpec, je);
+        _log.error("{} Failed to schedule or run FlowSpec {}", serviceName,  
addedSpec, je);
       }
     }
   }
@@ -252,14 +241,6 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
     }
     _log.info("Spec deletion detected: " + deletedSpecURI + "/" + 
deletedSpecVersion);
 
-    if (!isActive && helixManager.isPresent()) {
-      _log.info("Scheduler running in slave mode, forward Spec delete via 
Helix message to master: " + deletedSpecURI);
-      
HelixUtils.sendUserDefinedMessage(ServiceConfigKeys.HELIX_FLOWSPEC_REMOVE,
-          deletedSpecURI.toString() + ":" + deletedSpecVersion, 
UUID.randomUUID().toString(), InstanceType.CONTROLLER,
-          helixManager.get(), _log);
-      return;
-    }
-
     try {
       Spec deletedSpec = 
this.scheduledFlowSpecs.get(deletedSpecURI.toString());
       if (null != deletedSpec) {
@@ -292,13 +273,6 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
       return;
     }
 
-    if (!isActive && helixManager.isPresent()) {
-      _log.info("Scheduler running in slave mode, forward Spec update via 
Helix message to master: " + updatedSpec);
-      
HelixUtils.sendUserDefinedMessage(ServiceConfigKeys.HELIX_FLOWSPEC_UPDATE, 
updatedSpec.getUri().toString(),
-          UUID.randomUUID().toString(), InstanceType.CONTROLLER, 
helixManager.get(), _log);
-      return;
-    }
-
     try {
       onDeleteSpec(updatedSpec.getUri(), updatedSpec.getVersion());
     } catch (Exception e) {

Reply via email to