This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new aadf53d  [GOBBLIN-696] Provide an "explain" option to return a 
compiled flow wh…
aadf53d is described below

commit aadf53dbb8acc5a3de0249d3695384fdefa4eb35
Author: suvasude <[email protected]>
AuthorDate: Sun Mar 17 19:52:52 2019 -0700

    [GOBBLIN-696] Provide an "explain" option to return a compiled flow wh…
    
    Closes #2567 from sv2000/dagPlan1
---
 .../gobblin/configuration/ConfigurationKeys.java   |  1 +
 ...che.gobblin.service.flowconfigsV2.restspec.json |  3 ++
 .../org/apache/gobblin/service/FlowConfig.pdsc     |  6 +++
 ...pache.gobblin.service.flowconfigs.snapshot.json |  5 +++
 ...he.gobblin.service.flowconfigsV2.snapshot.json} | 41 ++++++++++++-----
 .../apache/gobblin/service/FlowConfigV2Client.java | 10 ++---
 .../apache/gobblin/service/FlowConfigV2Test.java   |  7 ++-
 .../service/FlowConfigResourceLocalHandler.java    | 11 +++++
 .../service/FlowConfigV2ResourceLocalHandler.java  | 35 ++++++++++++---
 .../gobblin/service/FlowConfigsV2Resource.java     | 40 ++++++++++++++---
 .../gobblin/runtime/api/MutableSpecCatalog.java    | 14 ++++--
 .../apache/gobblin/runtime/api/SpecCatalog.java    |  4 +-
 .../gobblin/runtime/api/SpecCatalogListener.java   | 19 +++++---
 .../runtime/spec_catalog/AddSpecResponse.java      | 34 +++++++++++++++
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  | 18 +++++---
 .../spec_catalog/SpecCatalogListenersList.java     |  9 ++--
 .../runtime/spec_catalog/TopologyCatalog.java      | 24 +++++++---
 .../modules/core/GobblinServiceManager.java        |  6 +++
 .../modules/flow/BaseFlowToJobSpecCompiler.java    |  4 +-
 .../gobblin/service/modules/flowgraph/Dag.java     | 16 +++++++
 .../modules/orchestration/Orchestrator.java        |  5 ++-
 .../GobblinServiceFlowConfigResourceHandler.java   | 11 +++--
 .../scheduler/GobblinServiceJobScheduler.java      | 51 ++++++++++++++++------
 .../service/modules/spec/JobExecutionPlan.java     | 11 ++++-
 24 files changed, 310 insertions(+), 75 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 9004cff..3bcbb69 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -135,6 +135,7 @@ public class ConfigurationKeys {
   public static final String FLOW_FAILURE_OPTION = "flow.failureOption";
   public static final String FLOW_APPLY_RETENTION = "flow.applyRetention";
   public static final String FLOW_APPLY_INPUT_RETENTION = 
"flow.applyInputRetention";
+  public static final String FLOW_EXPLAIN_KEY = "flow.explain";
 
   /**
    * Common topology configuration properties.
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
index e45718e..8aa0726 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
@@ -12,6 +12,9 @@
     },
     "supports" : [ "create", "delete", "get", "update" ],
     "methods" : [ {
+      "annotations" : {
+        "returnEntity" : { }
+      },
       "method" : "create",
       "doc" : "Create a flow configuration that the service will forward to 
execution instances for execution"
     }, {
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowConfig.pdsc
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowConfig.pdsc
index 19ac690..09af4f1 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowConfig.pdsc
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowConfig.pdsc
@@ -24,6 +24,12 @@
       }
     },
     {
+      "name" : "explain",
+      "type" : "boolean",
+      "default" : false,
+      "doc" : "Return the compiled flow as a string. If enabled, the flow is 
not added."
+    },
+    {
       "name" : "properties",
       "type" : { "type" : "map", "values" : "string" },
       "doc" : "Properties for the flow. These properties are passed to the 
compiled Gobblin jobs."
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
index 6106b7a..9485f8d 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
@@ -65,6 +65,11 @@
         "org.apache.gobblin.service.validator.TemplateUriValidator" : { }
       }
     }, {
+      "name" : "explain",
+      "type" : "boolean",
+      "doc" : "Return the compiled flow as a string. If enabled, the flow is 
not added.",
+      "default" : false
+    }, {
       "name" : "properties",
       "type" : {
         "type" : "map",
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
similarity index 77%
copy from 
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
copy to 
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
index 6106b7a..99ea6fd 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
@@ -65,6 +65,11 @@
         "org.apache.gobblin.service.validator.TemplateUriValidator" : { }
       }
     }, {
+      "name" : "explain",
+      "type" : "boolean",
+      "doc" : "Return the compiled flow as a string. If enabled, the flow is 
not added.",
+      "default" : false
+    }, {
       "name" : "properties",
       "type" : {
         "type" : "map",
@@ -74,28 +79,40 @@
     } ]
   }, {
     "type" : "record",
-    "name" : "EmptyRecord",
-    "namespace" : "com.linkedin.restli.common",
-    "doc" : "An literally empty record.  Intended as a marker to indicate the 
absence of content where a record type is required.  If used the underlying 
DataMap *must* be empty, EmptyRecordValidator is provided to help enforce this. 
 For example,  CreateRequest extends Request<EmptyRecord> to indicate it has no 
response body.   Also, a ComplexKeyResource implementation that has no ParamKey 
should have a signature like XyzResource implements ComplexKeyResource<XyzKey, 
EmptyRecord, Xyz>.",
-    "fields" : [ ],
-    "validate" : {
-      "com.linkedin.restli.common.EmptyRecordValidator" : { }
-    }
+    "name" : "FlowStatusId",
+    "namespace" : "org.apache.gobblin.service",
+    "doc" : "Identifier for a specific execution of a flow",
+    "fields" : [ {
+      "name" : "flowName",
+      "type" : "string",
+      "doc" : "Name of the flow"
+    }, {
+      "name" : "flowGroup",
+      "type" : "string",
+      "doc" : "Group of the flow. This defines the namespace for the flow."
+    }, {
+      "name" : "flowExecutionId",
+      "type" : "long",
+      "doc" : "Execution id for the flow"
+    } ]
   } ],
   "schema" : {
-    "name" : "flowconfigs",
+    "name" : "flowconfigsV2",
     "namespace" : "org.apache.gobblin.service",
-    "path" : "/flowconfigs",
+    "path" : "/flowconfigsV2",
     "schema" : "org.apache.gobblin.service.FlowConfig",
-    "doc" : "Resource for handling flow configuration requests\n\ngenerated 
from: org.apache.gobblin.service.FlowConfigsResource",
+    "doc" : "Resource for handling flow configuration requests\n\ngenerated 
from: org.apache.gobblin.service.FlowConfigsV2Resource",
     "collection" : {
       "identifier" : {
         "name" : "id",
         "type" : "org.apache.gobblin.service.FlowId",
-        "params" : "com.linkedin.restli.common.EmptyRecord"
+        "params" : "org.apache.gobblin.service.FlowStatusId"
       },
       "supports" : [ "create", "delete", "get", "update" ],
       "methods" : [ {
+        "annotations" : {
+          "returnEntity" : { }
+        },
         "method" : "create",
         "doc" : "Create a flow configuration that the service will forward to 
execution instances for execution"
       }, {
@@ -109,7 +126,7 @@
         "doc" : "Delete a configured flow. Running flows are not affected. The 
schedule will be removed for scheduled flows."
       } ],
       "entity" : {
-        "path" : "/flowconfigs/{id}"
+        "path" : "/flowconfigsV2/{id}"
       }
     }
   }
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
index 1471883..94af118 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
@@ -35,7 +35,7 @@ import com.linkedin.r2.RemoteInvocationException;
 import com.linkedin.r2.transport.common.Client;
 import com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
 import com.linkedin.r2.transport.http.client.HttpClientFactory;
-import com.linkedin.restli.client.CreateIdRequest;
+import com.linkedin.restli.client.CreateIdEntityRequest;
 import com.linkedin.restli.client.DeleteRequest;
 import com.linkedin.restli.client.GetRequest;
 import com.linkedin.restli.client.Response;
@@ -44,7 +44,7 @@ import com.linkedin.restli.client.RestClient;
 import com.linkedin.restli.client.UpdateRequest;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
-import com.linkedin.restli.common.IdResponse;
+import com.linkedin.restli.common.IdEntityResponse;
 
 
 /**
@@ -99,9 +99,9 @@ public class FlowConfigV2Client implements Closeable {
     LOG.debug("createFlowConfig with groupName " + 
flowConfig.getId().getFlowGroup() + " flowName " +
         flowConfig.getId().getFlowName());
 
-    CreateIdRequest<ComplexResourceKey<FlowId, FlowStatusId>, FlowConfig> 
request =
-        _flowconfigsV2RequestBuilders.create().input(flowConfig).build();
-    ResponseFuture<IdResponse<ComplexResourceKey<FlowId, FlowStatusId>>> 
flowConfigResponseFuture =
+    CreateIdEntityRequest<ComplexResourceKey<FlowId, FlowStatusId>, 
FlowConfig> request =
+        _flowconfigsV2RequestBuilders.createAndGet().input(flowConfig).build();
+    ResponseFuture<IdEntityResponse<ComplexResourceKey<FlowId, FlowStatusId>, 
FlowConfig>> flowConfigResponseFuture =
         _restClient.get().sendRequest(request);
 
     return 
createFlowStatusId(flowConfigResponseFuture.getResponse().getLocation().toString());
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index 821b947..9b43875 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -37,6 +37,7 @@ import com.google.inject.name.Names;
 import com.linkedin.data.template.StringMap;
 import com.linkedin.restli.server.resources.BaseResource;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -76,10 +77,12 @@ public class FlowConfigV2Test {
     Injector injector = Guice.createInjector(new Module() {
       @Override
       public void configure(Binder binder) {
-        
binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsV2ResourceHandler")).toInstance(new
 FlowConfigV2ResourceLocalHandler(flowCatalog));
+        
binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named(FlowConfigsV2Resource.FLOW_CONFIG_GENERATOR_INJECT_NAME)).toInstance(new
 FlowConfigV2ResourceLocalHandler(flowCatalog));
         // indicate that we are in unit testing since the resource is being 
blocked until flow catalog changes have
         // been made
-        
binder.bindConstant().annotatedWith(Names.named("readyToUse")).to(Boolean.TRUE);
+        
binder.bindConstant().annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_READY_TO_USE)).to(Boolean.TRUE);
+        
binder.bind(RequesterService.class).annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_REQUESTER_SERVICE)).toInstance(new
 NoopRequesterService(
+            ConfigFactory.empty()));
       }
     });
 
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 8ad0999..405e8e3 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -29,6 +29,7 @@ import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.common.HttpStatus;
 import com.linkedin.restli.server.CreateResponse;
+import com.linkedin.restli.server.RestLiServiceException;
 import com.linkedin.restli.server.UpdateResponse;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -108,6 +109,12 @@ public class FlowConfigResourceLocalHandler implements 
FlowConfigsResourceHandle
    */
   public CreateResponse createFlowConfig(FlowConfig flowConfig, boolean 
triggerListener) throws FlowConfigLoggedException {
     log.info("[GAAS-REST] Create called with flowGroup " + 
flowConfig.getId().getFlowGroup() + " flowName " + 
flowConfig.getId().getFlowName());
+
+    if (flowConfig.hasExplain()) {
+      //Return Error if FlowConfig has explain set. Explain request is only 
valid for v2 FlowConfig.
+      return new CreateResponse(new 
RestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "FlowConfig with explain 
not supported."));
+    }
+
     FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
     // Existence of a flow spec in the flow catalog implies that the flow is 
currently running.
     // If the new flow spec has a schedule we should allow submission of the 
new flow to accept the new schedule.
@@ -204,6 +211,10 @@ public class FlowConfigResourceLocalHandler implements 
FlowConfigsResourceHandle
       configBuilder.addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
String.valueOf(System.currentTimeMillis()));
     }
 
+    if (flowConfig.hasExplain()) {
+      configBuilder.addPrimitive(ConfigurationKeys.FLOW_EXPLAIN_KEY, 
flowConfig.isExplain());
+    }
+
     Config config = configBuilder.build();
 
     Config configWithFallback;
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index bd77858..c0c7fa6 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -15,17 +15,25 @@
  * limitations under the License.
  */
 package org.apache.gobblin.service;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+
+import com.linkedin.data.template.StringMap;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.HttpStatus;
-import com.linkedin.restli.server.CreateResponse;
+import com.linkedin.restli.server.CreateKVResponse;
 
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 @Slf4j
 public class FlowConfigV2ResourceLocalHandler extends 
FlowConfigResourceLocalHandler implements FlowConfigsResourceHandler {
+  public static final String GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS = 
"org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler";
+
   public FlowConfigV2ResourceLocalHandler(FlowCatalog flowCatalog) {
     super(flowCatalog);
   }
@@ -33,10 +41,14 @@ public class FlowConfigV2ResourceLocalHandler extends 
FlowConfigResourceLocalHan
   /**
    * Add flowConfig locally and trigger all listeners iff @param 
triggerListener is set to true
    */
-  public CreateResponse createFlowConfig(FlowConfig flowConfig, boolean 
triggerListener) throws FlowConfigLoggedException {
-    log.info("[GAAS-REST] Create called with flowGroup " + 
flowConfig.getId().getFlowGroup() + " flowName " + 
flowConfig.getId().getFlowName());
+  public CreateKVResponse createFlowConfig(FlowConfig flowConfig, boolean 
triggerListener) throws FlowConfigLoggedException {
+    String createLog = "[GAAS-REST] Create called with flowGroup " + 
flowConfig.getId().getFlowGroup() + " flowName " + 
flowConfig.getId().getFlowName();
+    if (flowConfig.hasExplain()) {
+      createLog += " explain " + Boolean.toString(flowConfig.isExplain());
+    }
+    log.info(createLog);
     FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
-    this.flowCatalog.put(flowSpec, triggerListener);
+    Map<String, AddSpecResponse> responseMap = this.flowCatalog.put(flowSpec, 
triggerListener);
     FlowStatusId flowStatusId = new FlowStatusId()
         
.setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY))
         
.setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY));
@@ -45,6 +57,19 @@ public class FlowConfigV2ResourceLocalHandler extends 
FlowConfigResourceLocalHan
     } else {
       flowStatusId.setFlowExecutionId(-1L);
     }
-    return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), 
flowStatusId), HttpStatus.S_201_CREATED);
+    HttpStatus httpStatus = HttpStatus.S_201_CREATED;
+
+    if (flowConfig.hasExplain() && flowConfig.isExplain()) {
+      //This is an Explain request. So no resource is actually created.
+      //Enrich original FlowConfig entity by adding the compiledFlow to the 
properties map.
+      StringMap props = flowConfig.getProperties();
+      AddSpecResponse<String> addSpecResponse = 
responseMap.getOrDefault(GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, null);
+      props.put("gobblin.flow.compiled",
+          addSpecResponse != null ? 
StringEscapeUtils.escapeJson(addSpecResponse.getValue()) : "");
+      flowConfig.setProperties(props);
+      //Return response with 200 status code, since no resource is actually 
created.
+      httpStatus = HttpStatus.S_200_OK;
+    }
+    return new CreateKVResponse(new ComplexResourceKey<>(flowConfig.getId(), 
flowStatusId), flowConfig, httpStatus);
   }
 }
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
index fa6112c..2adf4a6 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
@@ -15,19 +15,29 @@
  * limitations under the License.
  */
 package org.apache.gobblin.service;
+import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import com.google.common.collect.ImmutableSet;
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
 import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.server.CreateKVResponse;
 import com.linkedin.restli.server.CreateResponse;
 import com.linkedin.restli.server.UpdateResponse;
 import com.linkedin.restli.server.annotations.RestLiCollection;
+import com.linkedin.restli.server.annotations.ReturnEntity;
 import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+
+
 /**
  * Resource for handling flow configuration requests
  */
@@ -35,6 +45,9 @@ import 
com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
 public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, 
FlowStatusId, FlowConfig> {
   private static final Logger LOG = 
LoggerFactory.getLogger(FlowConfigsV2Resource.class);
   public static final String FLOW_CONFIG_GENERATOR_INJECT_NAME = 
"flowConfigsV2ResourceHandler";
+  public static final String INJECT_REQUESTER_SERVICE = "v2RequesterService";
+  public static final String INJECT_READY_TO_USE = "v2ReadyToUse";
+
   private static final Set<String> ALLOWED_METADATA = 
ImmutableSet.of("delete.state.store");
 
 
@@ -45,10 +58,15 @@ public class FlowConfigsV2Resource extends 
ComplexKeyResourceTemplate<FlowId, Fl
   @Named(FLOW_CONFIG_GENERATOR_INJECT_NAME)
   private FlowConfigsResourceHandler flowConfigsResourceHandler;
 
+  // For getting who sends the request
+  @Inject
+  @Named(INJECT_REQUESTER_SERVICE)
+  private RequesterService requesterService;
+
   // For blocking use of this resource until it is ready
   @Inject
-  @Named("readyToUse")
-  private Boolean readyToUse = Boolean.FALSE;
+  @Named(INJECT_READY_TO_USE)
+  private Boolean readyToUse;
 
   public FlowConfigsV2Resource() {
   }
@@ -71,9 +89,19 @@ public class FlowConfigsV2Resource extends 
ComplexKeyResourceTemplate<FlowId, Fl
    * @param flowConfig flow configuration
    * @return {@link CreateResponse}
    */
+  @ReturnEntity
   @Override
-  public CreateResponse create(FlowConfig flowConfig) {
-    return this.getFlowConfigResourceHandler().createFlowConfig(flowConfig);
+  public CreateKVResponse create(FlowConfig flowConfig) {
+    List<ServiceRequester> requestorList = 
this.requesterService.findRequesters(this);
+    try {
+      String serialized = this.requesterService.serialize(requestorList);
+      flowConfig.getProperties().put(RequesterService.REQUESTER_LIST, 
serialized);
+      LOG.info("Rest requester list is " + serialized);
+    } catch (IOException e) {
+      throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED,
+          "cannot get who is the requester", e);
+    }
+    return (CreateKVResponse) 
this.getFlowConfigResourceHandler().createFlowConfig(flowConfig);
   }
 
   /**
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
index 7a3e946..2ddbf7e 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
@@ -19,18 +19,20 @@ package org.apache.gobblin.runtime.api;
 
 import java.net.URI;
 import java.util.Collection;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.ContextAwareTimer;
-
 import com.google.common.base.Optional;
 import com.typesafe.config.Config;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareTimer;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
+
 
 /**
  * A {@link SpecCatalog} that can have its {@link Collection} of {@link Spec}s 
modified
@@ -41,8 +43,12 @@ public interface MutableSpecCatalog extends SpecCatalog {
   /**
    * Registers a new {@link Spec}. If a {@link Spec} with the same {@link 
Spec#getUri()} exists,
    * it will be replaced.
+   * @param spec
+   * @return a map containing an entry for each {@link SpecCatalogListener} of 
the {@link SpecCatalog}, for which an action is triggered
+   * on adding a {@link Spec} to the {@link SpecCatalog}. The key for each 
entry is the name of the {@link SpecCatalogListener}
+   * and the value is the result of the the action taken by the listener 
returned as an instance of {@link AddSpecResponse}.
    * */
-  public void put(Spec spec);
+  public Map<String, AddSpecResponse> put(Spec spec);
 
   /**
    * Removes an existing {@link Spec} with the given URI.
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
index 0312c4e..05f63a0 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
@@ -40,6 +40,7 @@ import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.ContextAwareTimer;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -119,9 +120,10 @@ public interface SpecCatalog extends 
SpecCatalogListenersContainer, Instrumentab
       Instrumented.updateTimer(Optional.of(this.timeForSpecCatalogGet), 
System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
     }
 
-    @Override public void onAddSpec(Spec addedSpec) {
+    @Override public AddSpecResponse onAddSpec(Spec addedSpec) {
       this.totalAddedSpecs.incrementAndGet();
       submitTrackingEvent(addedSpec, SPEC_ADDED_OPERATION_TYPE);
+      return new AddSpecResponse(null);
     }
 
     private void submitTrackingEvent(Spec spec, String operType) {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
index 1448231..67f2e39 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
@@ -22,12 +22,13 @@ import java.util.Properties;
 
 import com.google.common.base.Objects;
 
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.util.callbacks.Callback;
 
 public interface SpecCatalogListener {
   /** Invoked when a new {@link Spec} is added to the catalog and for all 
pre-existing specs on registration
    * of the listener.*/
-  void onAddSpec(Spec addedSpec);
+  AddSpecResponse onAddSpec(Spec addedSpec);
 
   /**
    * Invoked when a {@link Spec} gets removed from the catalog.
@@ -40,16 +41,15 @@ public interface SpecCatalogListener {
   public void onUpdateSpec(Spec updatedSpec);
 
   /** A standard implementation of onAddSpec as a functional object */
-  public static class AddSpecCallback extends Callback<SpecCatalogListener, 
Void> {
+  public static class AddSpecCallback extends Callback<SpecCatalogListener, 
AddSpecResponse> {
     private final Spec _addedSpec;
     public AddSpecCallback(Spec addedSpec) {
       super(Objects.toStringHelper("onAddSpec").add("addedSpec", 
addedSpec).toString());
       _addedSpec = addedSpec;
     }
 
-    @Override public Void apply(SpecCatalogListener listener) {
-      listener.onAddSpec(_addedSpec);
-      return null;
+    @Override public AddSpecResponse apply(SpecCatalogListener listener) {
+      return listener.onAddSpec(_addedSpec);
     }
   }
 
@@ -89,4 +89,13 @@ public interface SpecCatalogListener {
       return null;
     }
   }
+
+  /**
+   * A default implementation to return the name of the {@link 
SpecCatalogListener}.
+   * @return
+   */
+  default String getName() {
+    return getClass().getName();
+  }
+
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/AddSpecResponse.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/AddSpecResponse.java
new file mode 100644
index 0000000..9d83e34
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/AddSpecResponse.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.spec_catalog;
+
+/**
+ * A generic class that allows listeners of a {@link 
org.apache.gobblin.runtime.api.SpecCatalog} to return a response when a
+ * {@link org.apache.gobblin.runtime.api.Spec} is added to the {@link 
org.apache.gobblin.runtime.api.SpecCatalog}.
+ * @param <T>
+ */
+public class AddSpecResponse<T> {
+  private T value;
+
+  public AddSpecResponse(T value) {
+    this.value = value;
+  }
+
+  public T getValue() {
+    return this.value;
+  }
+}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 3422d66..a7db4fd 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -22,7 +22,9 @@ import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.commons.lang3.SerializationUtils;
@@ -53,11 +55,12 @@ import org.apache.gobblin.runtime.api.SpecSerDe;
 import org.apache.gobblin.runtime.api.SpecStore;
 import org.apache.gobblin.runtime.spec_store.FSSpecStore;
 import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.callbacks.CallbackResult;
+import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
 
 
 @Alpha
 public class FlowCatalog extends AbstractIdleService implements SpecCatalog, 
MutableSpecCatalog, SpecSerDe {
-
   public static final String DEFAULT_FLOWSPEC_STORE_CLASS = 
FSSpecStore.class.getCanonicalName();
 
   protected final SpecCatalogListenersList listeners;
@@ -239,7 +242,8 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
     }
   }
 
-  public void put(Spec spec, boolean triggerListener) {
+  public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
+    Map<String, AddSpecResponse> responseMap = new HashMap<>();
     try {
       Preconditions.checkState(state() == State.RUNNING, String.format("%s is 
not running.", this.getClass().getName()));
       Preconditions.checkNotNull(spec);
@@ -250,16 +254,20 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
       specStore.addSpec(spec);
       metrics.updatePutSpecTime(startTime);
       if (triggerListener) {
-        this.listeners.onAddSpec(spec);
+        
AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, 
AddSpecResponse>> response = this.listeners.onAddSpec(spec);
+        for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> 
entry: response.getValue().getSuccesses().entrySet()) {
+          responseMap.put(entry.getKey().getName(), 
entry.getValue().getResult());
+        }
       }
     } catch (IOException e) {
       throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
     }
+    return responseMap;
   }
 
   @Override
-  public void put(Spec spec) {
-    put(spec, true);
+  public Map<String, AddSpecResponse> put(Spec spec) {
+    return put(spec, true);
   }
 
   public void remove(URI uri) {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
index 9a798c1..e0c11ab 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
@@ -44,7 +44,7 @@ public class SpecCatalogListenersList implements 
SpecCatalogListener, SpecCatalo
   }
 
   public SpecCatalogListenersList(Optional<Logger> log) {
-    _disp = new 
CallbacksDispatcher<SpecCatalogListener>(Optional.<ExecutorService>absent(), 
log);
+    _disp = new CallbacksDispatcher<>(Optional.<ExecutorService>absent(), log);
   }
 
   public Logger getLog() {
@@ -66,13 +66,14 @@ public class SpecCatalogListenersList implements 
SpecCatalogListener, SpecCatalo
   }
 
   @Override
-  public synchronized void onAddSpec(Spec addedSpec) {
+  public synchronized AddSpecResponse onAddSpec(Spec addedSpec) {
     Preconditions.checkNotNull(addedSpec);
     try {
-      _disp.execCallbacks(new SpecCatalogListener.AddSpecCallback(addedSpec));
+      return new AddSpecResponse<>(_disp.execCallbacks(new 
AddSpecCallback(addedSpec)));
     } catch (InterruptedException e) {
       getLog().warn("onAddSpec interrupted.");
     }
+    return null;
   }
 
   @Override
@@ -102,7 +103,7 @@ public class SpecCatalogListenersList implements 
SpecCatalogListener, SpecCatalo
     _disp.close();
   }
 
-  public void callbackOneListener(Function<SpecCatalogListener, Void> callback,
+  public void callbackOneListener(Function<SpecCatalogListener, 
AddSpecResponse> callback,
       SpecCatalogListener listener) {
     try {
       _disp.execCallbacks(callback, listener);
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index a842abd..c44e111 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -22,16 +22,14 @@ import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 
-import javax.annotation.Nonnull;
-import lombok.Getter;
-
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.gobblin.runtime.api.FlowSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,11 +39,15 @@ import 
com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.Service;
 import com.typesafe.config.Config;
 
+import javax.annotation.Nonnull;
+import lombok.Getter;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
 import org.apache.gobblin.runtime.api.MutableSpecCatalog;
 import org.apache.gobblin.runtime.api.Spec;
@@ -57,6 +59,8 @@ import org.apache.gobblin.runtime.api.SpecStore;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.spec_store.FSSpecStore;
 import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.callbacks.CallbackResult;
+import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
 
 
 @Alpha
@@ -228,7 +232,9 @@ public class TopologyCatalog extends AbstractIdleService 
implements SpecCatalog,
   }
 
   @Override
-  public void put(Spec spec) {
+  public Map<String, AddSpecResponse> put(Spec spec) {
+    Map<String, AddSpecResponse> responseMap = new HashMap<>();
+
     try {
       Preconditions.checkState(state() == Service.State.RUNNING, 
String.format("%s is not running.", this.getClass().getName()));
       Preconditions.checkNotNull(spec);
@@ -236,17 +242,21 @@ public class TopologyCatalog extends AbstractIdleService 
implements SpecCatalog,
       log.info(String.format("Adding TopologySpec with URI: %s and Config: 
%s", spec.getUri(),
           ((TopologySpec) spec).getConfigAsProperties()));
         specStore.addSpec(spec);
-        this.listeners.onAddSpec(spec);
+      AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, 
AddSpecResponse>> response = this.listeners.onAddSpec(spec);
+      for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> 
entry: response.getValue().getSuccesses().entrySet()) {
+        responseMap.put(entry.getKey().getName(), 
entry.getValue().getResult());
+      }
     } catch (IOException e) {
       throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
     }
+    return responseMap;
   }
 
   public void remove(URI uri) {
     remove(uri, new Properties());
   }
 
-    @Override
+  @Override
   public void remove(URI uri, Properties headers) {
     try {
       Preconditions.checkState(state() == Service.State.RUNNING, 
String.format("%s is not running.", this.getClass().getName()));
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index d96d6d2..a1f897d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -258,9 +258,15 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
           binder.bindConstant()
               
.annotatedWith(Names.named(FlowConfigsResource.INJECT_READY_TO_USE))
               .to(Boolean.TRUE);
+          binder.bindConstant()
+              
.annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_READY_TO_USE))
+              .to(Boolean.TRUE);
           binder.bind(RequesterService.class)
               
.annotatedWith(Names.named(FlowConfigsResource.INJECT_REQUESTER_SERVICE))
               .toInstance(new NoopRequesterService(config));
+          binder.bind(RequesterService.class)
+              
.annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_REQUESTER_SERVICE))
+              .toInstance(new NoopRequesterService(config));
         }
       });
       this.restliServer = EmbeddedRestliServer.builder()
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 0f51185..16bb04c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -54,6 +54,7 @@ import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.job_catalog.FSJobCatalog;
 import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.ServiceMetricNames;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -159,7 +160,7 @@ public abstract class BaseFlowToJobSpecCompiler implements 
SpecCompiler {
   }
 
   @Override
-  public synchronized void onAddSpec(Spec addedSpec) {
+  public synchronized AddSpecResponse onAddSpec(Spec addedSpec) {
     TopologySpec spec = (TopologySpec) addedSpec;
     log.info ("Loading topology {}", spec.toLongString());
     for (Map.Entry entry: spec.getConfigAsProperties().entrySet()) {
@@ -167,6 +168,7 @@ public abstract class BaseFlowToJobSpecCompiler implements 
SpecCompiler {
     }
 
     topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec);
+    return new AddSpecResponse(null);
   }
 
   public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index db81c6a..c38460e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -274,4 +274,20 @@ public class Dag<T> {
       return this.getValue().hashCode();
     }
   }
+
+  /**
+   * @return A string representation of the Dag as a JSON Array.
+   */
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    sb.append("[");
+    for (DagNode node: this.getNodes()) {
+      sb.append(node.getValue().toString());
+      sb.append(",");
+    }
+    sb.delete(sb.length()-1, sb.length());
+    sb.append("]");
+    return sb.toString();
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 3b838fe..27fb888 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -53,6 +53,7 @@ import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecCatalogListener;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.ServiceMetricNames;
@@ -153,11 +154,12 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
   /** {@inheritDoc} */
   @Override
-  public void onAddSpec(Spec addedSpec) {
+  public AddSpecResponse onAddSpec(Spec addedSpec) {
     if (addedSpec instanceof TopologySpec) {
       _log.info("New Spec detected of type TopologySpec: " + addedSpec);
       this.specCompiler.onAddSpec(addedSpec);
     }
+    return new AddSpecResponse(null);
   }
 
   public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
@@ -193,6 +195,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     } catch (Exception e) {
       _log.error("Failed to update Spec: " + updatedSpec, e);
     }
+
   }
 
   public void orchestrate(Spec spec) throws Exception {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
index d0e7737..12abf3d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -97,15 +97,20 @@ public class GobblinServiceFlowConfigResourceHandler 
implements FlowConfigsResou
 
     try {
       if (!jobScheduler.isActive() && helixManager.isPresent()) {
+        CreateResponse response = null;
         if (this.flowCatalogLocalCommit) {
           // We will handle FS I/O locally for load balance before forwarding 
to remote node.
-          this.localHandler.createFlowConfig(flowConfig, false);
+          response = this.localHandler.createFlowConfig(flowConfig, false);
         }
 
-        forwardMessage(ServiceConfigKeys.HELIX_FLOWSPEC_ADD, 
FlowConfigUtils.serializeFlowConfig(flowConfig), flowName, flowGroup);
+        if (!flowConfig.hasExplain() || !flowConfig.isExplain()) {
+          //Forward the message to master only if it is not an "explain" 
request.
+          forwardMessage(ServiceConfigKeys.HELIX_FLOWSPEC_ADD, 
FlowConfigUtils.serializeFlowConfig(flowConfig), flowName, flowGroup);
+        }
 
         // Do actual work on remote node, directly return success
-        return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), 
new EmptyRecord()), HttpStatus.S_201_CREATED);
+        return response == null ? new CreateResponse(new 
ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()),
+            HttpStatus.S_201_CREATED) : response;
       } else {
         return this.localHandler.createFlowConfig(flowConfig);
       }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index ee4ecb7..e5dc41e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -49,14 +49,17 @@ import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecCatalogListener;
 import org.apache.gobblin.runtime.listeners.JobListener;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.scheduler.BaseGobblinJob;
 import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 
@@ -179,12 +182,12 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
 
   /** {@inheritDoc} */
   @Override
-  public void onAddSpec(Spec addedSpec) {
+  public AddSpecResponse onAddSpec(Spec addedSpec) {
     if (this.helixManager.isPresent() && 
!this.helixManager.get().isConnected()) {
       // Specs in store will be notified when Scheduler is added as listener 
to FlowCatalog, so ignore
       // .. Specs if in cluster mode and Helix is not yet initialized
       _log.info("System not yet initialized. Skipping Spec Addition: " + 
addedSpec);
-      return;
+      return null;
     }
 
     _log.info("New Flow Spec detected: " + addedSpec);
@@ -205,24 +208,46 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
           jobConfig.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY,
               
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
         }
-
-        this.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);
-
-        if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
-          _log.info("{} Scheduling flow spec: {} ", this.serviceName, 
addedSpec);
-          scheduleJob(jobConfig, null);
-          if (PropertiesUtils.getPropAsBoolean(jobConfig, 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
-            _log.info("RunImmediately requested, hence executing FlowSpec: " + 
addedSpec);
-            this.jobExecutor.execute(new 
NonScheduledJobRunner(flowSpec.getUri(), false, jobConfig, null));
+        boolean isExplain = ConfigUtils.getBoolean(flowSpec.getConfig(), 
ConfigurationKeys.FLOW_EXPLAIN_KEY, false);
+        String compiledFlow = null;
+        if (!isExplain) {
+          this.scheduledFlowSpecs.put(addedSpec.getUri().toString(), 
addedSpec);
+
+          if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+            _log.info("{} Scheduling flow spec: {} ", this.serviceName, 
addedSpec);
+            scheduleJob(jobConfig, null);
+            if (PropertiesUtils.getPropAsBoolean(jobConfig, 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
+              _log.info("RunImmediately requested, hence executing FlowSpec: " 
+ addedSpec);
+              this.jobExecutor.execute(new 
NonScheduledJobRunner(flowSpec.getUri(), false, jobConfig, null));
+            }
+          } else {
+            _log.info("No FlowSpec schedule found, so running FlowSpec: " + 
addedSpec);
+            this.jobExecutor.execute(new 
NonScheduledJobRunner(flowSpec.getUri(), true, jobConfig, null));
           }
         } else {
-          _log.info("No FlowSpec schedule found, so running FlowSpec: " + 
addedSpec);
-          this.jobExecutor.execute(new 
NonScheduledJobRunner(flowSpec.getUri(), true, jobConfig, null));
+          //Return a compiled flow.
+          try {
+            this.orchestrator.getSpecCompiler().awaitHealthy();
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+          Dag<JobExecutionPlan> dag = 
this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
+          if (dag != null && !dag.isEmpty()) {
+            compiledFlow = dag.toString();
+          }
+          _log.info("{} Skipping adding flow spec: {}, since it is an EXPLAIN 
request", this.serviceName, addedSpec);
+
+          if (this.flowCatalog.isPresent()) {
+            _log.debug("{} Removing flow spec from FlowCatalog: {}", 
this.serviceName, flowSpec);
+            this.flowCatalog.get().remove(flowSpec.getUri(), new Properties(), 
false);
+          }
         }
+        return new AddSpecResponse(compiledFlow);
       } catch (JobException je) {
         _log.error("{} Failed to schedule or run FlowSpec {}", serviceName,  
addedSpec, je);
       }
     }
+    return null;
   }
 
   public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 6d62591..d4ac3ac 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -26,6 +26,7 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Strings;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigRenderOptions;
 import com.typesafe.config.ConfigValueFactory;
 
 import lombok.Data;
@@ -38,7 +39,6 @@ import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
@@ -166,4 +166,13 @@ public class JobExecutionPlan {
               + "/" + jobName, null);
     }
   }
+
+  /**
+   * Render the JobSpec into a JSON string.
+   * @return a valid JSON string representation of the JobSpec.
+   */
+  @Override
+  public String toString() {
+    return jobSpec.getConfig().root().render(ConfigRenderOptions.concise());
+  }
 }

Reply via email to