This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new aadf53d [GOBBLIN-696] Provide an "explain" option to return a
compiled flow wh…
aadf53d is described below
commit aadf53dbb8acc5a3de0249d3695384fdefa4eb35
Author: suvasude <[email protected]>
AuthorDate: Sun Mar 17 19:52:52 2019 -0700
[GOBBLIN-696] Provide an "explain" option to return a compiled flow wh…
Closes #2567 from sv2000/dagPlan1
---
.../gobblin/configuration/ConfigurationKeys.java | 1 +
...che.gobblin.service.flowconfigsV2.restspec.json | 3 ++
.../org/apache/gobblin/service/FlowConfig.pdsc | 6 +++
...pache.gobblin.service.flowconfigs.snapshot.json | 5 +++
...he.gobblin.service.flowconfigsV2.snapshot.json} | 41 ++++++++++++-----
.../apache/gobblin/service/FlowConfigV2Client.java | 10 ++---
.../apache/gobblin/service/FlowConfigV2Test.java | 7 ++-
.../service/FlowConfigResourceLocalHandler.java | 11 +++++
.../service/FlowConfigV2ResourceLocalHandler.java | 35 ++++++++++++---
.../gobblin/service/FlowConfigsV2Resource.java | 40 ++++++++++++++---
.../gobblin/runtime/api/MutableSpecCatalog.java | 14 ++++--
.../apache/gobblin/runtime/api/SpecCatalog.java | 4 +-
.../gobblin/runtime/api/SpecCatalogListener.java | 19 +++++---
.../runtime/spec_catalog/AddSpecResponse.java | 34 +++++++++++++++
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 18 +++++---
.../spec_catalog/SpecCatalogListenersList.java | 9 ++--
.../runtime/spec_catalog/TopologyCatalog.java | 24 +++++++---
.../modules/core/GobblinServiceManager.java | 6 +++
.../modules/flow/BaseFlowToJobSpecCompiler.java | 4 +-
.../gobblin/service/modules/flowgraph/Dag.java | 16 +++++++
.../modules/orchestration/Orchestrator.java | 5 ++-
.../GobblinServiceFlowConfigResourceHandler.java | 11 +++--
.../scheduler/GobblinServiceJobScheduler.java | 51 ++++++++++++++++------
.../service/modules/spec/JobExecutionPlan.java | 11 ++++-
24 files changed, 310 insertions(+), 75 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 9004cff..3bcbb69 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -135,6 +135,7 @@ public class ConfigurationKeys {
public static final String FLOW_FAILURE_OPTION = "flow.failureOption";
public static final String FLOW_APPLY_RETENTION = "flow.applyRetention";
public static final String FLOW_APPLY_INPUT_RETENTION =
"flow.applyInputRetention";
+ public static final String FLOW_EXPLAIN_KEY = "flow.explain";
/**
* Common topology configuration properties.
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
index e45718e..8aa0726 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
@@ -12,6 +12,9 @@
},
"supports" : [ "create", "delete", "get", "update" ],
"methods" : [ {
+ "annotations" : {
+ "returnEntity" : { }
+ },
"method" : "create",
"doc" : "Create a flow configuration that the service will forward to
execution instances for execution"
}, {
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowConfig.pdsc
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowConfig.pdsc
index 19ac690..09af4f1 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowConfig.pdsc
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowConfig.pdsc
@@ -24,6 +24,12 @@
}
},
{
+ "name" : "explain",
+ "type" : "boolean",
+ "default" : false,
+ "doc" : "Return the compiled flow as a string. If enabled, the flow is
not added."
+ },
+ {
"name" : "properties",
"type" : { "type" : "map", "values" : "string" },
"doc" : "Properties for the flow. These properties are passed to the
compiled Gobblin jobs."
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
index 6106b7a..9485f8d 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
@@ -65,6 +65,11 @@
"org.apache.gobblin.service.validator.TemplateUriValidator" : { }
}
}, {
+ "name" : "explain",
+ "type" : "boolean",
+ "doc" : "Return the compiled flow as a string. If enabled, the flow is
not added.",
+ "default" : false
+ }, {
"name" : "properties",
"type" : {
"type" : "map",
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
similarity index 77%
copy from
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
copy to
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
index 6106b7a..99ea6fd 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
@@ -65,6 +65,11 @@
"org.apache.gobblin.service.validator.TemplateUriValidator" : { }
}
}, {
+ "name" : "explain",
+ "type" : "boolean",
+ "doc" : "Return the compiled flow as a string. If enabled, the flow is
not added.",
+ "default" : false
+ }, {
"name" : "properties",
"type" : {
"type" : "map",
@@ -74,28 +79,40 @@
} ]
}, {
"type" : "record",
- "name" : "EmptyRecord",
- "namespace" : "com.linkedin.restli.common",
- "doc" : "An literally empty record. Intended as a marker to indicate the
absence of content where a record type is required. If used the underlying
DataMap *must* be empty, EmptyRecordValidator is provided to help enforce this.
For example, CreateRequest extends Request<EmptyRecord> to indicate it has no
response body. Also, a ComplexKeyResource implementation that has no ParamKey
should have a signature like XyzResource implements ComplexKeyResource<XyzKey,
EmptyRecord, Xyz>.",
- "fields" : [ ],
- "validate" : {
- "com.linkedin.restli.common.EmptyRecordValidator" : { }
- }
+ "name" : "FlowStatusId",
+ "namespace" : "org.apache.gobblin.service",
+ "doc" : "Identifier for a specific execution of a flow",
+ "fields" : [ {
+ "name" : "flowName",
+ "type" : "string",
+ "doc" : "Name of the flow"
+ }, {
+ "name" : "flowGroup",
+ "type" : "string",
+ "doc" : "Group of the flow. This defines the namespace for the flow."
+ }, {
+ "name" : "flowExecutionId",
+ "type" : "long",
+ "doc" : "Execution id for the flow"
+ } ]
} ],
"schema" : {
- "name" : "flowconfigs",
+ "name" : "flowconfigsV2",
"namespace" : "org.apache.gobblin.service",
- "path" : "/flowconfigs",
+ "path" : "/flowconfigsV2",
"schema" : "org.apache.gobblin.service.FlowConfig",
- "doc" : "Resource for handling flow configuration requests\n\ngenerated
from: org.apache.gobblin.service.FlowConfigsResource",
+ "doc" : "Resource for handling flow configuration requests\n\ngenerated
from: org.apache.gobblin.service.FlowConfigsV2Resource",
"collection" : {
"identifier" : {
"name" : "id",
"type" : "org.apache.gobblin.service.FlowId",
- "params" : "com.linkedin.restli.common.EmptyRecord"
+ "params" : "org.apache.gobblin.service.FlowStatusId"
},
"supports" : [ "create", "delete", "get", "update" ],
"methods" : [ {
+ "annotations" : {
+ "returnEntity" : { }
+ },
"method" : "create",
"doc" : "Create a flow configuration that the service will forward to
execution instances for execution"
}, {
@@ -109,7 +126,7 @@
"doc" : "Delete a configured flow. Running flows are not affected. The
schedule will be removed for scheduled flows."
} ],
"entity" : {
- "path" : "/flowconfigs/{id}"
+ "path" : "/flowconfigsV2/{id}"
}
}
}
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
index 1471883..94af118 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
@@ -35,7 +35,7 @@ import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.r2.transport.common.Client;
import com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
import com.linkedin.r2.transport.http.client.HttpClientFactory;
-import com.linkedin.restli.client.CreateIdRequest;
+import com.linkedin.restli.client.CreateIdEntityRequest;
import com.linkedin.restli.client.DeleteRequest;
import com.linkedin.restli.client.GetRequest;
import com.linkedin.restli.client.Response;
@@ -44,7 +44,7 @@ import com.linkedin.restli.client.RestClient;
import com.linkedin.restli.client.UpdateRequest;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
-import com.linkedin.restli.common.IdResponse;
+import com.linkedin.restli.common.IdEntityResponse;
/**
@@ -99,9 +99,9 @@ public class FlowConfigV2Client implements Closeable {
LOG.debug("createFlowConfig with groupName " +
flowConfig.getId().getFlowGroup() + " flowName " +
flowConfig.getId().getFlowName());
- CreateIdRequest<ComplexResourceKey<FlowId, FlowStatusId>, FlowConfig>
request =
- _flowconfigsV2RequestBuilders.create().input(flowConfig).build();
- ResponseFuture<IdResponse<ComplexResourceKey<FlowId, FlowStatusId>>>
flowConfigResponseFuture =
+ CreateIdEntityRequest<ComplexResourceKey<FlowId, FlowStatusId>,
FlowConfig> request =
+ _flowconfigsV2RequestBuilders.createAndGet().input(flowConfig).build();
+ ResponseFuture<IdEntityResponse<ComplexResourceKey<FlowId, FlowStatusId>,
FlowConfig>> flowConfigResponseFuture =
_restClient.get().sendRequest(request);
return
createFlowStatusId(flowConfigResponseFuture.getResponse().getLocation().toString());
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index 821b947..9b43875 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -37,6 +37,7 @@ import com.google.inject.name.Names;
import com.linkedin.data.template.StringMap;
import com.linkedin.restli.server.resources.BaseResource;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -76,10 +77,12 @@ public class FlowConfigV2Test {
Injector injector = Guice.createInjector(new Module() {
@Override
public void configure(Binder binder) {
-
binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsV2ResourceHandler")).toInstance(new
FlowConfigV2ResourceLocalHandler(flowCatalog));
+
binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named(FlowConfigsV2Resource.FLOW_CONFIG_GENERATOR_INJECT_NAME)).toInstance(new
FlowConfigV2ResourceLocalHandler(flowCatalog));
// indicate that we are in unit testing since the resource is being
blocked until flow catalog changes have
// been made
-
binder.bindConstant().annotatedWith(Names.named("readyToUse")).to(Boolean.TRUE);
+
binder.bindConstant().annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_READY_TO_USE)).to(Boolean.TRUE);
+
binder.bind(RequesterService.class).annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_REQUESTER_SERVICE)).toInstance(new
NoopRequesterService(
+ ConfigFactory.empty()));
}
});
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 8ad0999..405e8e3 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -29,6 +29,7 @@ import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
import com.linkedin.restli.server.CreateResponse;
+import com.linkedin.restli.server.RestLiServiceException;
import com.linkedin.restli.server.UpdateResponse;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -108,6 +109,12 @@ public class FlowConfigResourceLocalHandler implements
FlowConfigsResourceHandle
*/
public CreateResponse createFlowConfig(FlowConfig flowConfig, boolean
triggerListener) throws FlowConfigLoggedException {
log.info("[GAAS-REST] Create called with flowGroup " +
flowConfig.getId().getFlowGroup() + " flowName " +
flowConfig.getId().getFlowName());
+
+ if (flowConfig.hasExplain()) {
+ //Return Error if FlowConfig has explain set. Explain request is only
valid for v2 FlowConfig.
+ return new CreateResponse(new
RestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "FlowConfig with explain
not supported."));
+ }
+
FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
// Existence of a flow spec in the flow catalog implies that the flow is
currently running.
// If the new flow spec has a schedule we should allow submission of the
new flow to accept the new schedule.
@@ -204,6 +211,10 @@ public class FlowConfigResourceLocalHandler implements
FlowConfigsResourceHandle
configBuilder.addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
String.valueOf(System.currentTimeMillis()));
}
+ if (flowConfig.hasExplain()) {
+ configBuilder.addPrimitive(ConfigurationKeys.FLOW_EXPLAIN_KEY,
flowConfig.isExplain());
+ }
+
Config config = configBuilder.build();
Config configWithFallback;
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index bd77858..c0c7fa6 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -15,17 +15,25 @@
* limitations under the License.
*/
package org.apache.gobblin.service;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+
+import com.linkedin.data.template.StringMap;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.HttpStatus;
-import com.linkedin.restli.server.CreateResponse;
+import com.linkedin.restli.server.CreateKVResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
@Slf4j
public class FlowConfigV2ResourceLocalHandler extends
FlowConfigResourceLocalHandler implements FlowConfigsResourceHandler {
+ public static final String GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS =
"org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler";
+
public FlowConfigV2ResourceLocalHandler(FlowCatalog flowCatalog) {
super(flowCatalog);
}
@@ -33,10 +41,14 @@ public class FlowConfigV2ResourceLocalHandler extends
FlowConfigResourceLocalHan
/**
* Add flowConfig locally and trigger all listeners iff @param
triggerListener is set to true
*/
- public CreateResponse createFlowConfig(FlowConfig flowConfig, boolean
triggerListener) throws FlowConfigLoggedException {
- log.info("[GAAS-REST] Create called with flowGroup " +
flowConfig.getId().getFlowGroup() + " flowName " +
flowConfig.getId().getFlowName());
+ public CreateKVResponse createFlowConfig(FlowConfig flowConfig, boolean
triggerListener) throws FlowConfigLoggedException {
+ String createLog = "[GAAS-REST] Create called with flowGroup " +
flowConfig.getId().getFlowGroup() + " flowName " +
flowConfig.getId().getFlowName();
+ if (flowConfig.hasExplain()) {
+ createLog += " explain " + Boolean.toString(flowConfig.isExplain());
+ }
+ log.info(createLog);
FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
- this.flowCatalog.put(flowSpec, triggerListener);
+ Map<String, AddSpecResponse> responseMap = this.flowCatalog.put(flowSpec,
triggerListener);
FlowStatusId flowStatusId = new FlowStatusId()
.setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY))
.setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY));
@@ -45,6 +57,19 @@ public class FlowConfigV2ResourceLocalHandler extends
FlowConfigResourceLocalHan
} else {
flowStatusId.setFlowExecutionId(-1L);
}
- return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(),
flowStatusId), HttpStatus.S_201_CREATED);
+ HttpStatus httpStatus = HttpStatus.S_201_CREATED;
+
+ if (flowConfig.hasExplain() && flowConfig.isExplain()) {
+ //This is an Explain request. So no resource is actually created.
+ //Enrich original FlowConfig entity by adding the compiledFlow to the
properties map.
+ StringMap props = flowConfig.getProperties();
+ AddSpecResponse<String> addSpecResponse =
responseMap.getOrDefault(GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, null);
+ props.put("gobblin.flow.compiled",
+ addSpecResponse != null ?
StringEscapeUtils.escapeJson(addSpecResponse.getValue()) : "");
+ flowConfig.setProperties(props);
+ //Return response with 200 status code, since no resource is actually
created.
+ httpStatus = HttpStatus.S_200_OK;
+ }
+ return new CreateKVResponse(new ComplexResourceKey<>(flowConfig.getId(),
flowStatusId), flowConfig, httpStatus);
}
}
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
index fa6112c..2adf4a6 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
@@ -15,19 +15,29 @@
* limitations under the License.
*/
package org.apache.gobblin.service;
+import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import com.google.common.collect.ImmutableSet;
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.server.CreateKVResponse;
import com.linkedin.restli.server.CreateResponse;
import com.linkedin.restli.server.UpdateResponse;
import com.linkedin.restli.server.annotations.RestLiCollection;
+import com.linkedin.restli.server.annotations.ReturnEntity;
import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+
+
/**
* Resource for handling flow configuration requests
*/
@@ -35,6 +45,9 @@ import
com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId,
FlowStatusId, FlowConfig> {
private static final Logger LOG =
LoggerFactory.getLogger(FlowConfigsV2Resource.class);
public static final String FLOW_CONFIG_GENERATOR_INJECT_NAME =
"flowConfigsV2ResourceHandler";
+ public static final String INJECT_REQUESTER_SERVICE = "v2RequesterService";
+ public static final String INJECT_READY_TO_USE = "v2ReadyToUse";
+
private static final Set<String> ALLOWED_METADATA =
ImmutableSet.of("delete.state.store");
@@ -45,10 +58,15 @@ public class FlowConfigsV2Resource extends
ComplexKeyResourceTemplate<FlowId, Fl
@Named(FLOW_CONFIG_GENERATOR_INJECT_NAME)
private FlowConfigsResourceHandler flowConfigsResourceHandler;
+ // For getting who sends the request
+ @Inject
+ @Named(INJECT_REQUESTER_SERVICE)
+ private RequesterService requesterService;
+
// For blocking use of this resource until it is ready
@Inject
- @Named("readyToUse")
- private Boolean readyToUse = Boolean.FALSE;
+ @Named(INJECT_READY_TO_USE)
+ private Boolean readyToUse;
public FlowConfigsV2Resource() {
}
@@ -71,9 +89,19 @@ public class FlowConfigsV2Resource extends
ComplexKeyResourceTemplate<FlowId, Fl
* @param flowConfig flow configuration
* @return {@link CreateResponse}
*/
+ @ReturnEntity
@Override
- public CreateResponse create(FlowConfig flowConfig) {
- return this.getFlowConfigResourceHandler().createFlowConfig(flowConfig);
+ public CreateKVResponse create(FlowConfig flowConfig) {
+ List<ServiceRequester> requestorList =
this.requesterService.findRequesters(this);
+ try {
+ String serialized = this.requesterService.serialize(requestorList);
+ flowConfig.getProperties().put(RequesterService.REQUESTER_LIST,
serialized);
+ LOG.info("Rest requester list is " + serialized);
+ } catch (IOException e) {
+ throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED,
+ "cannot get who is the requester", e);
+ }
+ return (CreateKVResponse)
this.getFlowConfigResourceHandler().createFlowConfig(flowConfig);
}
/**
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
index 7a3e946..2ddbf7e 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
@@ -19,18 +19,20 @@ package org.apache.gobblin.runtime.api;
import java.net.URI;
import java.util.Collection;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.ContextAwareTimer;
-
import com.google.common.base.Optional;
import com.typesafe.config.Config;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareTimer;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
+
/**
* A {@link SpecCatalog} that can have its {@link Collection} of {@link Spec}s
modified
@@ -41,8 +43,12 @@ public interface MutableSpecCatalog extends SpecCatalog {
/**
* Registers a new {@link Spec}. If a {@link Spec} with the same {@link
Spec#getUri()} exists,
* it will be replaced.
+ * @param spec
+ * @return a map containing an entry for each {@link SpecCatalogListener} of
the {@link SpecCatalog}, for which an action is triggered
+ * on adding a {@link Spec} to the {@link SpecCatalog}. The key for each
entry is the name of the {@link SpecCatalogListener}
+ * and the value is the result of the the action taken by the listener
returned as an instance of {@link AddSpecResponse}.
* */
- public void put(Spec spec);
+ public Map<String, AddSpecResponse> put(Spec spec);
/**
* Removes an existing {@link Spec} with the given URI.
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
index 0312c4e..05f63a0 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
@@ -40,6 +40,7 @@ import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareTimer;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.util.ConfigUtils;
@@ -119,9 +120,10 @@ public interface SpecCatalog extends
SpecCatalogListenersContainer, Instrumentab
Instrumented.updateTimer(Optional.of(this.timeForSpecCatalogGet),
System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
}
- @Override public void onAddSpec(Spec addedSpec) {
+ @Override public AddSpecResponse onAddSpec(Spec addedSpec) {
this.totalAddedSpecs.incrementAndGet();
submitTrackingEvent(addedSpec, SPEC_ADDED_OPERATION_TYPE);
+ return new AddSpecResponse(null);
}
private void submitTrackingEvent(Spec spec, String operType) {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
index 1448231..67f2e39 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
@@ -22,12 +22,13 @@ import java.util.Properties;
import com.google.common.base.Objects;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.util.callbacks.Callback;
public interface SpecCatalogListener {
/** Invoked when a new {@link Spec} is added to the catalog and for all
pre-existing specs on registration
* of the listener.*/
- void onAddSpec(Spec addedSpec);
+ AddSpecResponse onAddSpec(Spec addedSpec);
/**
* Invoked when a {@link Spec} gets removed from the catalog.
@@ -40,16 +41,15 @@ public interface SpecCatalogListener {
public void onUpdateSpec(Spec updatedSpec);
/** A standard implementation of onAddSpec as a functional object */
- public static class AddSpecCallback extends Callback<SpecCatalogListener,
Void> {
+ public static class AddSpecCallback extends Callback<SpecCatalogListener,
AddSpecResponse> {
private final Spec _addedSpec;
public AddSpecCallback(Spec addedSpec) {
super(Objects.toStringHelper("onAddSpec").add("addedSpec",
addedSpec).toString());
_addedSpec = addedSpec;
}
- @Override public Void apply(SpecCatalogListener listener) {
- listener.onAddSpec(_addedSpec);
- return null;
+ @Override public AddSpecResponse apply(SpecCatalogListener listener) {
+ return listener.onAddSpec(_addedSpec);
}
}
@@ -89,4 +89,13 @@ public interface SpecCatalogListener {
return null;
}
}
+
+ /**
+ * A default implementation to return the name of the {@link
SpecCatalogListener}.
+ * @return
+ */
+ default String getName() {
+ return getClass().getName();
+ }
+
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/AddSpecResponse.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/AddSpecResponse.java
new file mode 100644
index 0000000..9d83e34
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/AddSpecResponse.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.spec_catalog;
+
+/**
+ * A generic class that allows listeners of a {@link
org.apache.gobblin.runtime.api.SpecCatalog} to return a response when a
+ * {@link org.apache.gobblin.runtime.api.Spec} is added to the {@link
org.apache.gobblin.runtime.api.SpecCatalog}.
+ * @param <T>
+ */
+public class AddSpecResponse<T> {
+ private T value;
+
+ public AddSpecResponse(T value) {
+ this.value = value;
+ }
+
+ public T getValue() {
+ return this.value;
+ }
+}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 3422d66..a7db4fd 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -22,7 +22,9 @@ import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang3.SerializationUtils;
@@ -53,11 +55,12 @@ import org.apache.gobblin.runtime.api.SpecSerDe;
import org.apache.gobblin.runtime.api.SpecStore;
import org.apache.gobblin.runtime.spec_store.FSSpecStore;
import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.callbacks.CallbackResult;
+import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
@Alpha
public class FlowCatalog extends AbstractIdleService implements SpecCatalog,
MutableSpecCatalog, SpecSerDe {
-
public static final String DEFAULT_FLOWSPEC_STORE_CLASS =
FSSpecStore.class.getCanonicalName();
protected final SpecCatalogListenersList listeners;
@@ -239,7 +242,8 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
}
}
- public void put(Spec spec, boolean triggerListener) {
+ public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
+ Map<String, AddSpecResponse> responseMap = new HashMap<>();
try {
Preconditions.checkState(state() == State.RUNNING, String.format("%s is
not running.", this.getClass().getName()));
Preconditions.checkNotNull(spec);
@@ -250,16 +254,20 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
specStore.addSpec(spec);
metrics.updatePutSpecTime(startTime);
if (triggerListener) {
- this.listeners.onAddSpec(spec);
+
AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener,
AddSpecResponse>> response = this.listeners.onAddSpec(spec);
+ for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>>
entry: response.getValue().getSuccesses().entrySet()) {
+ responseMap.put(entry.getKey().getName(),
entry.getValue().getResult());
+ }
}
} catch (IOException e) {
throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
}
+ return responseMap;
}
@Override
- public void put(Spec spec) {
- put(spec, true);
+ public Map<String, AddSpecResponse> put(Spec spec) {
+ return put(spec, true);
}
public void remove(URI uri) {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
index 9a798c1..e0c11ab 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
@@ -44,7 +44,7 @@ public class SpecCatalogListenersList implements
SpecCatalogListener, SpecCatalo
}
public SpecCatalogListenersList(Optional<Logger> log) {
- _disp = new
CallbacksDispatcher<SpecCatalogListener>(Optional.<ExecutorService>absent(),
log);
+ _disp = new CallbacksDispatcher<>(Optional.<ExecutorService>absent(), log);
}
public Logger getLog() {
@@ -66,13 +66,14 @@ public class SpecCatalogListenersList implements
SpecCatalogListener, SpecCatalo
}
@Override
- public synchronized void onAddSpec(Spec addedSpec) {
+ public synchronized AddSpecResponse onAddSpec(Spec addedSpec) {
Preconditions.checkNotNull(addedSpec);
try {
- _disp.execCallbacks(new SpecCatalogListener.AddSpecCallback(addedSpec));
+ return new AddSpecResponse<>(_disp.execCallbacks(new
AddSpecCallback(addedSpec)));
} catch (InterruptedException e) {
getLog().warn("onAddSpec interrupted.");
}
+ return null;
}
@Override
@@ -102,7 +103,7 @@ public class SpecCatalogListenersList implements
SpecCatalogListener, SpecCatalo
_disp.close();
}
- public void callbackOneListener(Function<SpecCatalogListener, Void> callback,
+ public void callbackOneListener(Function<SpecCatalogListener,
AddSpecResponse> callback,
SpecCatalogListener listener) {
try {
_disp.execCallbacks(callback, listener);
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index a842abd..c44e111 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -22,16 +22,14 @@ import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
-import javax.annotation.Nonnull;
-import lombok.Getter;
-
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.gobblin.runtime.api.FlowSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,11 +39,15 @@ import
com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
+import javax.annotation.Nonnull;
+import lombok.Getter;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
import org.apache.gobblin.runtime.api.MutableSpecCatalog;
import org.apache.gobblin.runtime.api.Spec;
@@ -57,6 +59,8 @@ import org.apache.gobblin.runtime.api.SpecStore;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_store.FSSpecStore;
import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.callbacks.CallbackResult;
+import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
@Alpha
@@ -228,7 +232,9 @@ public class TopologyCatalog extends AbstractIdleService
implements SpecCatalog,
}
@Override
- public void put(Spec spec) {
+ public Map<String, AddSpecResponse> put(Spec spec) {
+ Map<String, AddSpecResponse> responseMap = new HashMap<>();
+
try {
Preconditions.checkState(state() == Service.State.RUNNING,
String.format("%s is not running.", this.getClass().getName()));
Preconditions.checkNotNull(spec);
@@ -236,17 +242,21 @@ public class TopologyCatalog extends AbstractIdleService
implements SpecCatalog,
log.info(String.format("Adding TopologySpec with URI: %s and Config:
%s", spec.getUri(),
((TopologySpec) spec).getConfigAsProperties()));
specStore.addSpec(spec);
- this.listeners.onAddSpec(spec);
+ AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener,
AddSpecResponse>> response = this.listeners.onAddSpec(spec);
+ for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>>
entry: response.getValue().getSuccesses().entrySet()) {
+ responseMap.put(entry.getKey().getName(),
entry.getValue().getResult());
+ }
} catch (IOException e) {
throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
}
+ return responseMap;
}
public void remove(URI uri) {
remove(uri, new Properties());
}
- @Override
+ @Override
public void remove(URI uri, Properties headers) {
try {
Preconditions.checkState(state() == Service.State.RUNNING,
String.format("%s is not running.", this.getClass().getName()));
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index d96d6d2..a1f897d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -258,9 +258,15 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
binder.bindConstant()
.annotatedWith(Names.named(FlowConfigsResource.INJECT_READY_TO_USE))
.to(Boolean.TRUE);
+ binder.bindConstant()
+
.annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_READY_TO_USE))
+ .to(Boolean.TRUE);
binder.bind(RequesterService.class)
.annotatedWith(Names.named(FlowConfigsResource.INJECT_REQUESTER_SERVICE))
.toInstance(new NoopRequesterService(config));
+ binder.bind(RequesterService.class)
+
.annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_REQUESTER_SERVICE))
+ .toInstance(new NoopRequesterService(config));
}
});
this.restliServer = EmbeddedRestliServer.builder()
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 0f51185..16bb04c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -54,6 +54,7 @@ import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.job_catalog.FSJobCatalog;
import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.ServiceMetricNames;
import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -159,7 +160,7 @@ public abstract class BaseFlowToJobSpecCompiler implements
SpecCompiler {
}
@Override
- public synchronized void onAddSpec(Spec addedSpec) {
+ public synchronized AddSpecResponse onAddSpec(Spec addedSpec) {
TopologySpec spec = (TopologySpec) addedSpec;
log.info ("Loading topology {}", spec.toLongString());
for (Map.Entry entry: spec.getConfigAsProperties().entrySet()) {
@@ -167,6 +168,7 @@ public abstract class BaseFlowToJobSpecCompiler implements
SpecCompiler {
}
topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec);
+ return new AddSpecResponse(null);
}
public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index db81c6a..c38460e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -274,4 +274,20 @@ public class Dag<T> {
return this.getValue().hashCode();
}
}
+
+ /**
+ * @return A string representation of the Dag as a JSON Array.
+ */
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("[");
+ for (DagNode node: this.getNodes()) {
+ sb.append(node.getValue().toString());
+ sb.append(",");
+ }
+ sb.delete(sb.length()-1, sb.length());
+ sb.append("]");
+ return sb.toString();
+ }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 3b838fe..27fb888 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -53,6 +53,7 @@ import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.ServiceMetricNames;
@@ -153,11 +154,12 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
/** {@inheritDoc} */
@Override
- public void onAddSpec(Spec addedSpec) {
+ public AddSpecResponse onAddSpec(Spec addedSpec) {
if (addedSpec instanceof TopologySpec) {
_log.info("New Spec detected of type TopologySpec: " + addedSpec);
this.specCompiler.onAddSpec(addedSpec);
}
+ return new AddSpecResponse(null);
}
public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
@@ -193,6 +195,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
} catch (Exception e) {
_log.error("Failed to update Spec: " + updatedSpec, e);
}
+
}
public void orchestrate(Spec spec) throws Exception {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
index d0e7737..12abf3d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -97,15 +97,20 @@ public class GobblinServiceFlowConfigResourceHandler
implements FlowConfigsResou
try {
if (!jobScheduler.isActive() && helixManager.isPresent()) {
+ CreateResponse response = null;
if (this.flowCatalogLocalCommit) {
// We will handle FS I/O locally for load balance before forwarding
to remote node.
- this.localHandler.createFlowConfig(flowConfig, false);
+ response = this.localHandler.createFlowConfig(flowConfig, false);
}
- forwardMessage(ServiceConfigKeys.HELIX_FLOWSPEC_ADD,
FlowConfigUtils.serializeFlowConfig(flowConfig), flowName, flowGroup);
+ if (!flowConfig.hasExplain() || !flowConfig.isExplain()) {
+ //Forward the message to master only if it is not an "explain"
request.
+ forwardMessage(ServiceConfigKeys.HELIX_FLOWSPEC_ADD,
FlowConfigUtils.serializeFlowConfig(flowConfig), flowName, flowGroup);
+ }
// Do actual work on remote node, directly return success
- return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(),
new EmptyRecord()), HttpStatus.S_201_CREATED);
+ return response == null ? new CreateResponse(new
ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()),
+ HttpStatus.S_201_CREATED) : response;
} else {
return this.localHandler.createFlowConfig(flowConfig);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index ee4ecb7..e5dc41e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -49,14 +49,17 @@ import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.listeners.JobListener;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.scheduler.BaseGobblinJob;
import org.apache.gobblin.scheduler.JobScheduler;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
@@ -179,12 +182,12 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
/** {@inheritDoc} */
@Override
- public void onAddSpec(Spec addedSpec) {
+ public AddSpecResponse onAddSpec(Spec addedSpec) {
if (this.helixManager.isPresent() &&
!this.helixManager.get().isConnected()) {
// Specs in store will be notified when Scheduler is added as listener
to FlowCatalog, so ignore
// .. Specs if in cluster mode and Helix is not yet initialized
_log.info("System not yet initialized. Skipping Spec Addition: " +
addedSpec);
- return;
+ return null;
}
_log.info("New Flow Spec detected: " + addedSpec);
@@ -205,24 +208,46 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
jobConfig.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY,
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
}
-
- this.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);
-
- if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
- _log.info("{} Scheduling flow spec: {} ", this.serviceName,
addedSpec);
- scheduleJob(jobConfig, null);
- if (PropertiesUtils.getPropAsBoolean(jobConfig,
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
- _log.info("RunImmediately requested, hence executing FlowSpec: " +
addedSpec);
- this.jobExecutor.execute(new
NonScheduledJobRunner(flowSpec.getUri(), false, jobConfig, null));
+ boolean isExplain = ConfigUtils.getBoolean(flowSpec.getConfig(),
ConfigurationKeys.FLOW_EXPLAIN_KEY, false);
+ String compiledFlow = null;
+ if (!isExplain) {
+ this.scheduledFlowSpecs.put(addedSpec.getUri().toString(),
addedSpec);
+
+ if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+ _log.info("{} Scheduling flow spec: {} ", this.serviceName,
addedSpec);
+ scheduleJob(jobConfig, null);
+ if (PropertiesUtils.getPropAsBoolean(jobConfig,
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
+ _log.info("RunImmediately requested, hence executing FlowSpec: "
+ addedSpec);
+ this.jobExecutor.execute(new
NonScheduledJobRunner(flowSpec.getUri(), false, jobConfig, null));
+ }
+ } else {
+ _log.info("No FlowSpec schedule found, so running FlowSpec: " +
addedSpec);
+ this.jobExecutor.execute(new
NonScheduledJobRunner(flowSpec.getUri(), true, jobConfig, null));
}
} else {
- _log.info("No FlowSpec schedule found, so running FlowSpec: " +
addedSpec);
- this.jobExecutor.execute(new
NonScheduledJobRunner(flowSpec.getUri(), true, jobConfig, null));
+ //Return a compiled flow.
+ try {
+ this.orchestrator.getSpecCompiler().awaitHealthy();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ Dag<JobExecutionPlan> dag =
this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
+ if (dag != null && !dag.isEmpty()) {
+ compiledFlow = dag.toString();
+ }
+ _log.info("{} Skipping adding flow spec: {}, since it is an EXPLAIN
request", this.serviceName, addedSpec);
+
+ if (this.flowCatalog.isPresent()) {
+ _log.debug("{} Removing flow spec from FlowCatalog: {}",
this.serviceName, flowSpec);
+ this.flowCatalog.get().remove(flowSpec.getUri(), new Properties(),
false);
+ }
}
+ return new AddSpecResponse(compiledFlow);
} catch (JobException je) {
_log.error("{} Failed to schedule or run FlowSpec {}", serviceName,
addedSpec, je);
}
}
+ return null;
}
public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 6d62591..d4ac3ac 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -26,6 +26,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigRenderOptions;
import com.typesafe.config.ConfigValueFactory;
import lombok.Data;
@@ -38,7 +39,6 @@ import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
@@ -166,4 +166,13 @@ public class JobExecutionPlan {
+ "/" + jobName, null);
}
}
+
+ /**
+ * Render the JobSpec into a JSON string.
+ * @return a valid JSON string representation of the JobSpec.
+ */
+ @Override
+ public String toString() {
+ return jobSpec.getConfig().root().render(ConfigRenderOptions.concise());
+ }
}