Repository: incubator-gobblin Updated Branches: refs/heads/master c2d59a1cb -> 1896d7fab
[GOBBLIN-538] Flow config v2 resource Closes #2431 from arjun4084346/FlowConfigV2Resource Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1896d7fa Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1896d7fa Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1896d7fa Branch: refs/heads/master Commit: 1896d7fab5ec43ac248fdb5bebf0218bed156d00 Parents: c2d59a1 Author: Arjun <[email protected]> Authored: Fri Sep 7 17:29:14 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Fri Sep 7 17:29:14 2018 -0700 ---------------------------------------------------------------------- ....gobblin.service.flowconfigsV2.restspec.json | 31 +++ .../gobblin/service/FlowConfigV2Client.java | 213 +++++++++++++++++++ .../gobblin/service/FlowConfigV2Test.java | 134 ++++++++++++ .../service/FlowConfigResourceLocalHandler.java | 10 +- .../FlowConfigV2ResourceLocalHandler.java | 50 +++++ .../gobblin/service/FlowConfigsV2Resource.java | 122 +++++++++++ .../modules/core/GobblinServiceManager.java | 11 + .../modules/flow/BaseFlowToJobSpecCompiler.java | 34 +-- .../gobblin/service/modules/flow/FlowUtils.java | 36 ++++ .../flow/MultiHopsFlowToJobSpecCompiler.java | 47 ++-- .../pathfinder/AbstractPathFinder.java | 4 +- 11 files changed, 649 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json new file mode 100644 index 0000000..e45718e --- /dev/null +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json @@ -0,0 +1,31 @@ +{ + "name" : "flowconfigsV2", + "namespace" : "org.apache.gobblin.service", + "path" : "/flowconfigsV2", + "schema" : "org.apache.gobblin.service.FlowConfig", + "doc" : "Resource for handling flow configuration requests\n\ngenerated from: org.apache.gobblin.service.FlowConfigsV2Resource", + "collection" : { + "identifier" : { + "name" : "id", + "type" : "org.apache.gobblin.service.FlowId", + "params" : "org.apache.gobblin.service.FlowStatusId" + }, + "supports" : [ "create", "delete", "get", "update" ], + "methods" : [ { + "method" : "create", + "doc" : "Create a flow configuration that the service will forward to execution instances for execution" + }, { + "method" : "get", + "doc" : "Retrieve the flow configuration with the given key" + }, { + "method" : "update", + "doc" : "Update the flow configuration with the specified key. Running flows are not affected.\n An error is raised if the flow configuration does not exist." + }, { + "method" : "delete", + "doc" : "Delete a configured flow. Running flows are not affected. The schedule will be removed for scheduled flows." + } ], + "entity" : { + "path" : "/flowconfigsV2/{id}" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java new file mode 100644 index 0000000..1471883 --- /dev/null +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.linkedin.common.callback.FutureCallback; +import com.linkedin.common.util.None; +import com.linkedin.r2.RemoteInvocationException; +import com.linkedin.r2.transport.common.Client; +import com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter; +import com.linkedin.r2.transport.http.client.HttpClientFactory; +import com.linkedin.restli.client.CreateIdRequest; +import com.linkedin.restli.client.DeleteRequest; +import com.linkedin.restli.client.GetRequest; +import com.linkedin.restli.client.Response; +import com.linkedin.restli.client.ResponseFuture; +import com.linkedin.restli.client.RestClient; +import com.linkedin.restli.client.UpdateRequest; +import com.linkedin.restli.common.ComplexResourceKey; +import com.linkedin.restli.common.EmptyRecord; +import com.linkedin.restli.common.IdResponse; + + +/** + * Flow Configuration client for REST flow configuration server + */ +public class FlowConfigV2Client implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(FlowConfigV2Client.class); + + private Optional<HttpClientFactory> _httpClientFactory; + private Optional<RestClient> _restClient; + private final FlowconfigsV2RequestBuilders _flowconfigsV2RequestBuilders; + public static final String DELETE_STATE_STORE_KEY = "delete.state.store"; + private static final Pattern flowStatusIdParams = Pattern.compile(".*params:\\((?<flowStatusIdParams>.*?)\\)"); + + /** + * Construct a {@link FlowConfigV2Client} to communicate with http flow config server at URI serverUri + * @param serverUri address and port of the REST server + */ + public FlowConfigV2Client(String serverUri) { + LOG.debug("FlowConfigClient with serverUri " + serverUri); + + _httpClientFactory = Optional.of(new HttpClientFactory()); + Client r2Client = new TransportClientAdapter(_httpClientFactory.get().getClient(Collections.<String, String>emptyMap())); + _restClient = Optional.of(new RestClient(r2Client, serverUri)); + + _flowconfigsV2RequestBuilders = new FlowconfigsV2RequestBuilders(); + } + + /** + * Construct a {@link FlowConfigV2Client} to communicate with http flow config server at URI serverUri + * @param restClient restClient to send restli request + */ + public FlowConfigV2Client(RestClient restClient) { + LOG.debug("FlowConfigV2Client with restClient " + restClient); + + _httpClientFactory = Optional.absent(); + _restClient = Optional.of(restClient); + + _flowconfigsV2RequestBuilders = new FlowconfigsV2RequestBuilders(); + } + + /** + * Create a flow configuration + * It differs from {@link FlowConfigClient} in a way that it returns FlowStatusId, + * which can be used to find the FlowExecutionId + * @param flowConfig FlowConfig to be used to create the flow + * @return FlowStatusId + * @throws RemoteInvocationException + */ + public FlowStatusId createFlowConfig(FlowConfig flowConfig) + throws RemoteInvocationException { + LOG.debug("createFlowConfig with groupName " + flowConfig.getId().getFlowGroup() + " flowName " + + flowConfig.getId().getFlowName()); + + CreateIdRequest<ComplexResourceKey<FlowId, FlowStatusId>, FlowConfig> request = + _flowconfigsV2RequestBuilders.create().input(flowConfig).build(); + ResponseFuture<IdResponse<ComplexResourceKey<FlowId, FlowStatusId>>> flowConfigResponseFuture = + _restClient.get().sendRequest(request); + + return createFlowStatusId(flowConfigResponseFuture.getResponse().getLocation().toString()); + } + + private FlowStatusId createFlowStatusId(String locationHeader) { + Matcher matcher = flowStatusIdParams.matcher(locationHeader); + matcher.find(); + String allFields = matcher.group("flowStatusIdParams"); + String[] flowStatusIdParams = allFields.split(","); + Map<String, String> paramsMap = new HashMap<>(); + for (String flowStatusIdParam : flowStatusIdParams) { + paramsMap.put(flowStatusIdParam.split(":")[0], flowStatusIdParam.split(":")[1]); + } + FlowStatusId flowStatusId = new FlowStatusId() + .setFlowName(paramsMap.get("flowName")) + .setFlowGroup(paramsMap.get("flowGroup")); + if (paramsMap.containsKey("flowExecutionId")) { + flowStatusId.setFlowExecutionId(Long.parseLong(paramsMap.get("flowExecutionId"))); + } + return flowStatusId; + } + + /** + * Update a flow configuration + * @param flowConfig flow configuration attributes + * @throws RemoteInvocationException + */ + public void updateFlowConfig(FlowConfig flowConfig) + throws RemoteInvocationException { + LOG.debug("updateFlowConfig with groupName " + flowConfig.getId().getFlowGroup() + " flowName " + + flowConfig.getId().getFlowName()); + + FlowId flowId = new FlowId().setFlowGroup(flowConfig.getId().getFlowGroup()) + .setFlowName(flowConfig.getId().getFlowName()); + + UpdateRequest<FlowConfig> updateRequest = + _flowconfigsV2RequestBuilders.update().id(new ComplexResourceKey<>(flowId, new FlowStatusId())) + .input(flowConfig).build(); + + ResponseFuture<EmptyRecord> response = _restClient.get().sendRequest(updateRequest); + + response.getResponse(); + } + + /** + * Get a flow configuration + * @param flowId identifier of flow configuration to get + * @return a {@link FlowConfig} with the flow configuration + * @throws RemoteInvocationException + */ + public FlowConfig getFlowConfig(FlowId flowId) + throws RemoteInvocationException { + LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " + + flowId.getFlowName()); + + GetRequest<FlowConfig> getRequest = _flowconfigsV2RequestBuilders.get() + .id(new ComplexResourceKey<>(flowId, new FlowStatusId())).build(); + + Response<FlowConfig> response = + _restClient.get().sendRequest(getRequest).getResponse(); + return response.getEntity(); + } + + /** + * Delete a flow configuration + * @param flowId identifier of flow configuration to delete + * @throws RemoteInvocationException + */ + public void deleteFlowConfig(FlowId flowId) + throws RemoteInvocationException { + LOG.debug("deleteFlowConfig with groupName {}, flowName {}", flowId.getFlowGroup(), flowId.getFlowName()); + + DeleteRequest<FlowConfig> deleteRequest = _flowconfigsV2RequestBuilders.delete() + .id(new ComplexResourceKey<>(flowId, new FlowStatusId())).build(); + ResponseFuture<EmptyRecord> response = _restClient.get().sendRequest(deleteRequest); + + response.getResponse(); + } + + /** + * Delete a flow configuration + * @param flowId identifier of flow configuration to delete + * @throws RemoteInvocationException + */ + public void deleteFlowConfigWithStateStore(FlowId flowId) + throws RemoteInvocationException { + LOG.debug("deleteFlowConfig and state store with groupName " + flowId.getFlowGroup() + " flowName " + + flowId.getFlowName()); + + DeleteRequest<FlowConfig> deleteRequest = _flowconfigsV2RequestBuilders.delete() + .id(new ComplexResourceKey<>(flowId, new FlowStatusId())).setHeader(DELETE_STATE_STORE_KEY, Boolean.TRUE.toString()).build(); + ResponseFuture<EmptyRecord> response = _restClient.get().sendRequest(deleteRequest); + + response.getResponse(); + } + + @Override + public void close() + throws IOException { + if (_restClient.isPresent()) { + _restClient.get().shutdown(new FutureCallback<None>()); + } + + if (_httpClientFactory.isPresent()) { + _httpClientFactory.get().shutdown(new FutureCallback<None>()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java new file mode 100644 index 0000000..821b947 --- /dev/null +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service; + +import java.io.File; +import java.util.Map; + +import org.apache.commons.io.FileUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.name.Names; +import com.linkedin.data.template.StringMap; +import com.linkedin.restli.server.resources.BaseResource; +import com.typesafe.config.Config; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.restli.EmbeddedRestliServer; +import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; + + +@Test(groups = { "gobblin.service" }) +public class FlowConfigV2Test { + private FlowConfigV2Client _client; + private EmbeddedRestliServer _server; + private File _testDirectory; + + private static final String TEST_SPEC_STORE_DIR = "/tmp/flowConfigTest/"; + private static final String TEST_GROUP_NAME = "testGroup1"; + private static final String TEST_FLOW_NAME = "testFlow1"; + private static final String TEST_SCHEDULE = "0 1/0 * ? * *"; + private static final String TEST_TEMPLATE_URI = "FS:///templates/test.template"; + + @BeforeClass + public void setUp() throws Exception { + ConfigBuilder configBuilder = ConfigBuilder.create(); + + _testDirectory = Files.createTempDir(); + + configBuilder + .addPrimitive(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY, _testDirectory.getAbsolutePath()) + .addPrimitive(ConfigurationKeys.SPECSTORE_FS_DIR_KEY, TEST_SPEC_STORE_DIR); + cleanUpDir(TEST_SPEC_STORE_DIR); + + Config config = configBuilder.build(); + final FlowCatalog flowCatalog = new FlowCatalog(config); + + flowCatalog.startAsync(); + flowCatalog.awaitRunning(); + + Injector injector = Guice.createInjector(new Module() { + @Override + public void configure(Binder binder) { + binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsV2ResourceHandler")).toInstance(new FlowConfigV2ResourceLocalHandler(flowCatalog)); + // indicate that we are in unit testing since the resource is being blocked until flow catalog changes have + // been made + binder.bindConstant().annotatedWith(Names.named("readyToUse")).to(Boolean.TRUE); + } + }); + + _server = EmbeddedRestliServer.builder().resources( + Lists.<Class<? extends BaseResource>>newArrayList(FlowConfigsV2Resource.class)).injector(injector).build(); + + _server.startAsync(); + _server.awaitRunning(); + + _client = + new FlowConfigV2Client(String.format("http://localhost:%s/", _server.getPort())); + } + + protected void cleanUpDir(String dir) throws Exception { + File specStoreDir = new File(dir); + if (specStoreDir.exists()) { + FileUtils.deleteDirectory(specStoreDir); + } + } + + @Test + public void testCheckFlowExecutionId() throws Exception { + Map<String, String> flowProperties = Maps.newHashMap(); + flowProperties.put("param1", "value1"); + + FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME)) + .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties)); + FlowStatusId flowStatusId =_client.createFlowConfig(flowConfig); + Assert.assertEquals(TEST_GROUP_NAME, flowStatusId.getFlowGroup()); + Assert.assertEquals(TEST_FLOW_NAME, flowStatusId.getFlowName()); + Assert.assertTrue(flowStatusId.getFlowExecutionId() != -1); + + flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME)) + .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties)) + .setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(true)); + Assert.assertEquals(_client.createFlowConfig(flowConfig).getFlowExecutionId().longValue(), -1L); + } + + + @AfterClass(alwaysRun = true) + public void tearDown() throws Exception { + if (_client != null) { + _client.close(); + } + if (_server != null) { + _server.stopAsync(); + _server.awaitTerminated(); + } + _testDirectory.delete(); + cleanUpDir(TEST_SPEC_STORE_DIR); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java index 0007b6a..52cc0c7 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java @@ -48,7 +48,7 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; @Slf4j public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandler { @Getter - private FlowCatalog flowCatalog; + protected FlowCatalog flowCatalog; public FlowConfigResourceLocalHandler(FlowCatalog flowCatalog) { this.flowCatalog = flowCatalog; } @@ -187,6 +187,14 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle Schedule schedule = flowConfig.getSchedule(); configBuilder.addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, schedule.getCronSchedule()); configBuilder.addPrimitive(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, schedule.isRunImmediately()); + } else { + // If the job does not have schedule, it is a run-once job. + // In this case, we add flow execution id to the flow spec now to be able to send this id back to the user for + // flow status tracking purpose. + // If it is not a run-once job, we should not add flow execution id here, + // because execution id is generated for every scheduled execution of the flow and cannot be materialized to + // the flow catalog. In this case, this id is added during flow compilation. + configBuilder.addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, String.valueOf(System.currentTimeMillis())); } Config config = configBuilder.build(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java new file mode 100644 index 0000000..bd77858 --- /dev/null +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service; +import com.linkedin.restli.common.ComplexResourceKey; +import com.linkedin.restli.common.HttpStatus; +import com.linkedin.restli.server.CreateResponse; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; +@Slf4j +public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHandler implements FlowConfigsResourceHandler { + public FlowConfigV2ResourceLocalHandler(FlowCatalog flowCatalog) { + super(flowCatalog); + } + @Override + /** + * Add flowConfig locally and trigger all listeners iff @param triggerListener is set to true + */ + public CreateResponse createFlowConfig(FlowConfig flowConfig, boolean triggerListener) throws FlowConfigLoggedException { + log.info("[GAAS-REST] Create called with flowGroup " + flowConfig.getId().getFlowGroup() + " flowName " + flowConfig.getId().getFlowName()); + FlowSpec flowSpec = createFlowSpecForConfig(flowConfig); + this.flowCatalog.put(flowSpec, triggerListener); + FlowStatusId flowStatusId = new FlowStatusId() + .setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY)) + .setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY)); + if (flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) { + flowStatusId.setFlowExecutionId(Long.valueOf(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))); + } else { + flowStatusId.setFlowExecutionId(-1L); + } + return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), flowStatusId), HttpStatus.S_201_CREATED); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java new file mode 100644 index 0000000..7053e39 --- /dev/null +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Inject; +import com.google.inject.name.Named; +import com.linkedin.restli.common.ComplexResourceKey; +import com.linkedin.restli.server.CreateResponse; +import com.linkedin.restli.server.UpdateResponse; +import com.linkedin.restli.server.annotations.RestLiCollection; +import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate; +/** + * Resource for handling flow configuration requests + */ +@RestLiCollection(name = "flowconfigsV2", namespace = "org.apache.gobblin.service", keyName = "id") +public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, FlowStatusId, FlowConfig> { + private static final Logger LOG = LoggerFactory.getLogger(FlowConfigsV2Resource.class); + private static final Set<String> ALLOWED_METADATA = ImmutableSet.of("delete.state.store"); + + + @edu.umd.cs.findbugs.annotations.SuppressWarnings("MS_SHOULD_BE_FINAL") + public static FlowConfigsResourceHandler global_flowConfigsResourceHandler = null; + + @Inject + @Named("flowConfigsV2ResourceHandler") + private FlowConfigsResourceHandler flowConfigsResourceHandler; + + // For blocking use of this resource until it is ready + @Inject + @Named("readyToUse") + private Boolean readyToUse = Boolean.FALSE; + + public FlowConfigsV2Resource() { + } + + /** + * Retrieve the flow configuration with the given key + * @param key flow config id key containing group name and flow name + * @return {@link FlowConfig} with flow configuration + */ + @Override + public FlowConfig get(ComplexResourceKey<FlowId, FlowStatusId> key) { + String flowGroup = key.getKey().getFlowGroup(); + String flowName = key.getKey().getFlowName(); + FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName); + return this.getFlowConfigResourceHandler().getFlowConfig(flowId); + } + + /** + * Create a flow configuration that the service will forward to execution instances for execution + * @param flowConfig flow configuration + * @return {@link CreateResponse} + */ + @Override + public CreateResponse create(FlowConfig flowConfig) { + return this.getFlowConfigResourceHandler().createFlowConfig(flowConfig); + } + + /** + * Update the flow configuration with the specified key. Running flows are not affected. + * An error is raised if the flow configuration does not exist. + * @param key composite key containing group name and flow name that identifies the flow to update + * @param flowConfig new flow configuration + * @return {@link UpdateResponse} + */ + @Override + public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key, FlowConfig flowConfig) { + String flowGroup = key.getKey().getFlowGroup(); + String flowName = key.getKey().getFlowName(); + FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName); + return this.getFlowConfigResourceHandler().updateFlowConfig(flowId, flowConfig); + } + + /** + * Delete a configured flow. Running flows are not affected. The schedule will be removed for scheduled flows. + * @param key composite key containing flow group and flow name that identifies the flow to remove from the flow catalog + * @return {@link UpdateResponse} + */ + @Override + public UpdateResponse delete(ComplexResourceKey<FlowId, FlowStatusId> key) { + String flowGroup = key.getKey().getFlowGroup(); + String flowName = key.getKey().getFlowName(); + FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName); + return this.getFlowConfigResourceHandler().deleteFlowConfig(flowId, getHeaders()); + } + + private FlowConfigsResourceHandler getFlowConfigResourceHandler() { + if (global_flowConfigsResourceHandler != null) { + return global_flowConfigsResourceHandler; + } + return flowConfigsResourceHandler; + } + + private Properties getHeaders() { + Properties headerProperties = new Properties(); + for (Map.Entry<String, String> entry : getContext().getRequestHeaders().entrySet()) { + if (ALLOWED_METADATA.contains(entry.getKey())) { + headerProperties.put(entry.getKey(), entry.getValue()); + } + } + return headerProperties; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java index 071c126..9c1b42f 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java @@ -78,6 +78,7 @@ import org.apache.gobblin.scheduler.SchedulerService; import org.apache.gobblin.service.FlowConfig; import org.apache.gobblin.service.FlowConfigClient; import org.apache.gobblin.service.FlowConfigResourceLocalHandler; +import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler; import org.apache.gobblin.service.FlowConfigsResource; import org.apache.gobblin.service.FlowConfigsResourceHandler; import org.apache.gobblin.service.FlowId; @@ -122,6 +123,8 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri protected GobblinServiceJobScheduler scheduler; @Getter protected GobblinServiceFlowConfigResourceHandler resourceHandler; + @Getter + protected GobblinServiceFlowConfigResourceHandler v2ResourceHandler; protected boolean flowCatalogLocalCommit; protected Orchestrator orchestrator; @@ -219,13 +222,21 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri this.helixManager, this.scheduler); + this.v2ResourceHandler = new GobblinServiceFlowConfigResourceHandler(serviceName, + this.flowCatalogLocalCommit, + new FlowConfigV2ResourceLocalHandler(this.flowCatalog), + this.helixManager, + this.scheduler); + this.isRestLIServerEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY, true); + if (isRestLIServerEnabled) { Injector injector = Guice.createInjector(new Module() { @Override public void configure(Binder binder) { binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsResourceHandler")).toInstance(GobblinServiceManager.this.resourceHandler); + binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsV2ResourceHandler")).toInstance(GobblinServiceManager.this.v2ResourceHandler); binder.bindConstant().annotatedWith(Names.named("readyToUse")).to(Boolean.TRUE); } }); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java index 6604676..adef5cc 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java @@ -24,7 +24,9 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import javax.annotation.Nonnull; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; @@ -33,32 +35,30 @@ import com.google.common.collect.Maps; import com.typesafe.config.Config; import com.typesafe.config.ConfigValueFactory; -import org.apache.commons.lang3.StringUtils; +import javax.annotation.Nonnull; +import lombok.Getter; +import lombok.Setter; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.JobTemplate; import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.runtime.api.TopologySpec; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.runtime.job_catalog.FSJobCatalog; +import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.ServiceMetricNames; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.annotation.Alpha; -import org.apache.gobblin.runtime.api.FlowSpec; -import org.apache.gobblin.runtime.api.JobSpec; -import org.apache.gobblin.runtime.api.JobTemplate; -import org.apache.gobblin.runtime.api.SpecExecutor; -import org.apache.gobblin.runtime.api.SpecNotFoundException; -import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec; - -import lombok.Getter; -import lombok.Setter; // Provide base implementation for constructing multi-hops route. @Alpha @@ -250,7 +250,7 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler { } // Add flow execution id for this compilation - long flowExecutionId = System.currentTimeMillis(); + long flowExecutionId = FlowUtils.getOrCreateFlowExecutionId(flowSpec); jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId))); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowUtils.java new file mode 100644 index 0000000..2743d27 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowUtils.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.flow; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.FlowSpec; +public class FlowUtils { + /** + * A FlowSpec contains a FlowExecutionId if it is a runOnce flow. + * Refer {@link FlowConfigResourceLocalHandler#createFlowSpecForConfig} for details. + * @param spec flow spec + * @return flow execution id + */ + public static long getOrCreateFlowExecutionId(FlowSpec spec) { + long flowExecutionId; + if (spec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) { + flowExecutionId = spec.getConfig().getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY); + } else { + flowExecutionId = System.currentTimeMillis(); + } + return flowExecutionId; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java index 236f927..6b0b14a 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java @@ -17,7 +17,6 @@ package org.apache.gobblin.service.modules.flow; -import com.google.common.base.Splitter; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -28,41 +27,43 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.jgrapht.graph.DirectedWeightedMultigraph; +import org.slf4j.Logger; + import com.google.common.base.Optional; +import com.google.common.base.Splitter; import com.google.common.collect.Maps; import com.typesafe.config.Config; import com.typesafe.config.ConfigValueFactory; -import org.apache.commons.lang3.StringUtils; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; -import org.apache.gobblin.service.modules.flowgraph.Dag; -import org.apache.gobblin.service.modules.policy.ServicePolicy; -import org.apache.gobblin.service.modules.spec.JobExecutionPlan; -import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory; -import org.apache.gobblin.util.ClassAliasResolver; -import org.apache.gobblin.util.ConfigUtils; -import org.jgrapht.graph.DirectedWeightedMultigraph; -import org.slf4j.Logger; +import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.runtime.api.FlowEdge; -import org.apache.gobblin.runtime.api.ServiceNode; import org.apache.gobblin.runtime.api.FlowSpec; -import org.apache.gobblin.instrumented.Instrumented; -import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.TopologySpec; -import org.apache.gobblin.service.ServiceConfigKeys; -import org.apache.gobblin.runtime.spec_executorInstance.BaseServiceNodeImpl; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.JobTemplate; +import org.apache.gobblin.runtime.api.ServiceNode; +import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.runtime.api.SpecNotFoundException; +import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec; +import org.apache.gobblin.runtime.spec_executorInstance.BaseServiceNodeImpl; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.policy.ServicePolicy; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory; +import org.apache.gobblin.util.ClassAliasResolver; +import org.apache.gobblin.util.ConfigUtils; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - -import static org.apache.gobblin.service.ServiceConfigKeys.*; -import static org.apache.gobblin.service.modules.utils.FindPathUtils.*; +import static org.apache.gobblin.service.ServiceConfigKeys.SERVICE_POLICY_NAME; +import static org.apache.gobblin.service.modules.utils.FindPathUtils.dijkstraBasedPathFindingHelper; // Users are capable to inject hints/prioritization into route selection, in two forms: // 1. PolicyBasedBlockedConnection: Define some undesired routes @@ -309,7 +310,7 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler { } // Add flow execution id for this compilation - long flowExecutionId = System.currentTimeMillis(); + long flowExecutionId = FlowUtils.getOrCreateFlowExecutionId(flowSpec); jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId))); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java index 9901c2f..6c1d0b2 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java @@ -41,6 +41,7 @@ import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.dataset.DatasetDescriptor; import org.apache.gobblin.service.modules.flow.FlowEdgeContext; import org.apache.gobblin.service.modules.flow.FlowGraphPath; +import org.apache.gobblin.service.modules.flow.FlowUtils; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.flowgraph.DataNode; import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys; @@ -78,6 +79,7 @@ public abstract class AbstractPathFinder implements PathFinder { this.flowGraph = flowGraph; this.flowSpec = flowSpec; this.flowConfig = flowSpec.getConfig(); + this.flowExecutionId = FlowUtils.getOrCreateFlowExecutionId(flowSpec); //Get src/dest DataNodes from the flow config String srcNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""); @@ -237,8 +239,6 @@ public abstract class AbstractPathFinder implements PathFinder { @Override public FlowGraphPath findPath() throws PathFinderException { - // Generate flow execution id for this compilation - this.flowExecutionId = System.currentTimeMillis(); FlowGraphPath flowGraphPath = new FlowGraphPath(flowSpec, flowExecutionId); //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the
