[GOBBLIN-458] Refactor flowConfig resource handler Closes #2329 from yukuai518/delete
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/01302a6d Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/01302a6d Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/01302a6d Branch: refs/heads/master Commit: 01302a6db2c468507c0adea5daa69de997aaf14b Parents: 2f76947 Author: Kuai Yu <[email protected]> Authored: Wed May 16 11:40:22 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Wed May 16 11:40:22 2018 -0700 ---------------------------------------------------------------------- .../gobblin/service/ServiceConfigKeys.java | 5 + .../apache/gobblin/service/FlowConfigTest.java | 2 +- .../service/FlowConfigLoggedException.java | 41 ++++ .../service/FlowConfigResourceLocalHandler.java | 202 ++++++++++++++++++ .../gobblin/service/FlowConfigsResource.java | 182 ++-------------- .../service/FlowConfigsResourceHandler.java | 47 +++++ .../runtime/spec_catalog/FlowCatalog.java | 34 +-- ...trollerUserDefinedMessageHandlerFactory.java | 206 +++++++++++++++++++ .../service/modules/core/GitConfigMonitor.java | 7 +- .../modules/core/GobblinServiceManager.java | 162 ++++----------- .../service/modules/restli/FlowConfigUtils.java | 105 ++++++++++ ...GobblinServiceFlowConfigResourceHandler.java | 206 +++++++++++++++++++ .../scheduler/GobblinServiceJobScheduler.java | 48 +---- .../modules/core/GobblinServiceHATest.java | 58 +++++- .../modules/restli/FlowConfigUtilsTest.java | 164 +++++++++++++++ gobblin-service/src/test/resources/log4j.xml | 40 ++++ .../apache/gobblin/util/PropertiesUtils.java | 21 +- 17 files changed, 1177 insertions(+), 353 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java index 7231e0c..72e06a0 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java @@ -47,6 +47,11 @@ public class ServiceConfigKeys { // Flow Compiler Keys public static final String GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY = GOBBLIN_SERVICE_PREFIX + "flowCompiler.class"; + + // Flow Catalog Keys + public static final String GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT = GOBBLIN_SERVICE_PREFIX + "flowCatalog.localCommit"; + public static final boolean DEFAULT_GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT = true; + /** * Directly use canonical class name here to avoid introducing additional dependency here. */ http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java index a373762..951103c 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java @@ -82,7 +82,7 @@ public class FlowConfigTest { Injector injector = Guice.createInjector(new Module() { @Override public void configure(Binder binder) { - binder.bind(FlowCatalog.class).annotatedWith(Names.named("flowCatalog")).toInstance(flowCatalog); + binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsResourceHandler")).toInstance(new FlowConfigResourceLocalHandler(flowCatalog)); // indicate that we are in unit testing since the resource is being blocked until flow catalog changes have // been made binder.bindConstant().annotatedWith(Names.named("readyToUse")).to(Boolean.TRUE); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigLoggedException.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigLoggedException.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigLoggedException.java new file mode 100644 index 0000000..848498a --- /dev/null +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigLoggedException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.linkedin.restli.common.HttpStatus; +import com.linkedin.restli.server.RestLiServiceException; + +/** + * Exception thrown by {@link FlowConfigsResourceHandler} when it cannot handle Restli gracefully. + */ +public class FlowConfigLoggedException extends RestLiServiceException { + private static final Logger log = LoggerFactory.getLogger(FlowConfigLoggedException.class); + + public FlowConfigLoggedException(final HttpStatus status, final String message) { + super(status, message); + log.error(message); + } + + public FlowConfigLoggedException(final HttpStatus status, final String message, final Throwable cause) { + super(status, message, cause); + log.error(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java new file mode 100644 index 0000000..0007b6a --- /dev/null +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Properties; + +import org.apache.commons.lang.StringUtils; + +import com.google.common.collect.Maps; +import com.linkedin.data.template.StringMap; +import com.linkedin.restli.common.ComplexResourceKey; +import com.linkedin.restli.common.EmptyRecord; +import com.linkedin.restli.common.HttpStatus; +import com.linkedin.restli.server.CreateResponse; +import com.linkedin.restli.server.UpdateResponse; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.SpecNotFoundException; +import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; + +/** + * A {@link FlowConfigsResourceHandler} that handles Restli locally. + */ +@Slf4j +public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandler { + @Getter + private FlowCatalog flowCatalog; + public FlowConfigResourceLocalHandler(FlowCatalog flowCatalog) { + this.flowCatalog = flowCatalog; + } + + /** + * Get flow config + */ + public FlowConfig getFlowConfig(FlowId flowId) throws FlowConfigLoggedException { + log.info("[GAAS-REST] Get called with flowGroup {} flowName {}", flowId.getFlowGroup(), flowId.getFlowName()); + + try { + URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null); + URI flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(), + "/" + flowId.getFlowGroup() + "/" + flowId.getFlowName(), null, null); + FlowSpec spec = (FlowSpec) flowCatalog.getSpec(flowUri); + FlowConfig flowConfig = new FlowConfig(); + Properties flowProps = spec.getConfigAsProperties(); + Schedule schedule = null; + + if (flowProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) { + schedule = new Schedule(); + schedule.setCronSchedule(flowProps.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY)); + } + if (flowProps.containsKey(ConfigurationKeys.JOB_TEMPLATE_PATH)) { + flowConfig.setTemplateUris(flowProps.getProperty(ConfigurationKeys.JOB_TEMPLATE_PATH)); + } else if (spec.getTemplateURIs().isPresent()) { + flowConfig.setTemplateUris(StringUtils.join(spec.getTemplateURIs().get(), ",")); + } else { + flowConfig.setTemplateUris("NA"); + } + if (schedule != null) { + if (flowProps.containsKey(ConfigurationKeys.FLOW_RUN_IMMEDIATELY)) { + schedule.setRunImmediately(Boolean.valueOf(flowProps.getProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY))); + } + + flowConfig.setSchedule(schedule); + } + + // remove keys that were injected as part of flowSpec creation + flowProps.remove(ConfigurationKeys.JOB_SCHEDULE_KEY); + flowProps.remove(ConfigurationKeys.JOB_TEMPLATE_PATH); + + StringMap flowPropsAsStringMap = new StringMap(); + flowPropsAsStringMap.putAll(Maps.fromProperties(flowProps)); + + return flowConfig.setId(new FlowId().setFlowGroup(flowId.getFlowGroup()).setFlowName(flowId.getFlowName())) + .setProperties(flowPropsAsStringMap); + } catch (URISyntaxException e) { + throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowId.getFlowName(), e); + } catch (SpecNotFoundException e) { + throw new FlowConfigLoggedException(HttpStatus.S_404_NOT_FOUND, "Flow requested does not exist: " + flowId.getFlowName(), null); + } + } + + /** + * Add flowConfig locally and trigger all listeners iff @param triggerListener is set to true + */ + public CreateResponse createFlowConfig(FlowConfig flowConfig, boolean triggerListener) throws FlowConfigLoggedException { + log.info("[GAAS-REST] Create called with flowGroup " + flowConfig.getId().getFlowGroup() + " flowName " + flowConfig.getId().getFlowName()); + FlowSpec flowSpec = createFlowSpecForConfig(flowConfig); + this.flowCatalog.put(flowSpec, triggerListener); + return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_201_CREATED); + } + + /** + * Add flowConfig locally and trigger all listeners + */ + public CreateResponse createFlowConfig(FlowConfig flowConfig) throws FlowConfigLoggedException { + return this.createFlowConfig(flowConfig, true); + } + + /** + * Update flowConfig locally and trigger all listeners iff @param triggerListener is set to true + */ + public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig, boolean triggerListener) { + log.info("[GAAS-REST] Update called with flowGroup {} flowName {}", flowId.getFlowGroup(), flowId.getFlowName()); + + if (!flowId.getFlowGroup().equals(flowConfig.getId().getFlowGroup()) || !flowId.getFlowName().equals(flowConfig.getId().getFlowName())) { + throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, + "flowName and flowGroup cannot be changed in update", null); + } + + this.flowCatalog.put(createFlowSpecForConfig(flowConfig), triggerListener); + return new UpdateResponse(HttpStatus.S_200_OK); + } + + /** + * Update flowConfig locally and trigger all listeners + */ + public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig) throws FlowConfigLoggedException { + return updateFlowConfig(flowId, flowConfig, true); + } + + /** + * Delete flowConfig locally and trigger all listeners iff @param triggerListener is set to true + */ + public UpdateResponse deleteFlowConfig(FlowId flowId, Properties header, boolean triggerListener) throws FlowConfigLoggedException { + + log.info("[GAAS-REST] Delete called with flowGroup {} flowName {}", flowId.getFlowGroup(), flowId.getFlowName()); + URI flowUri = null; + + try { + flowUri = createFlowSpecUri(flowId); + this.flowCatalog.remove(flowUri, header, triggerListener); + return new UpdateResponse(HttpStatus.S_200_OK); + } catch (URISyntaxException e) { + throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowUri, e); + } + } + + /** + * Delete flowConfig locally and trigger all listeners + */ + public UpdateResponse deleteFlowConfig(FlowId flowId, Properties header) throws FlowConfigLoggedException { + return deleteFlowConfig(flowId, header, true); + } + + public static URI createFlowSpecUri (FlowId flowId) throws URISyntaxException { + URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null); + URI flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(), + "/" + flowId.getFlowGroup() + "/" + flowId.getFlowName(), null, null); + return flowUri; + } + + /** + * Build a {@link FlowSpec} from a {@link FlowConfig} + * @param flowConfig flow configuration + * @return {@link FlowSpec} created with attributes from flowConfig + */ + public static FlowSpec createFlowSpecForConfig(FlowConfig flowConfig) { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, flowConfig.getId().getFlowGroup()) + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, flowConfig.getId().getFlowName()); + + if (flowConfig.hasSchedule()) { + Schedule schedule = flowConfig.getSchedule(); + configBuilder.addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, schedule.getCronSchedule()); + configBuilder.addPrimitive(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, schedule.isRunImmediately()); + } + + Config config = configBuilder.build(); + Config configWithFallback = config.withFallback(ConfigFactory.parseMap(flowConfig.getProperties())); + + try { + URI templateURI = new URI(flowConfig.getTemplateUris()); + return FlowSpec.builder().withConfig(configWithFallback).withTemplate(templateURI).build(); + } catch (URISyntaxException e) { + throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowConfig.getTemplateUris(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java index 9074a43..8c91ca7 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java @@ -17,38 +17,22 @@ package org.apache.gobblin.service; -import java.net.URI; -import java.net.URISyntaxException; import java.util.Map; import java.util.Properties; import java.util.Set; -import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Maps; +import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; import com.google.inject.name.Named; -import com.linkedin.data.template.StringMap; import com.linkedin.restli.common.ComplexResourceKey; import com.linkedin.restli.common.EmptyRecord; -import com.linkedin.restli.common.HttpStatus; import com.linkedin.restli.server.CreateResponse; -import com.linkedin.restli.server.RestLiServiceException; import com.linkedin.restli.server.UpdateResponse; import com.linkedin.restli.server.annotations.RestLiCollection; import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -import org.apache.gobblin.config.ConfigBuilder; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.runtime.api.FlowSpec; -import org.apache.gobblin.runtime.api.SpecNotFoundException; -import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; - -import com.google.common.collect.ImmutableSet; /** @@ -61,33 +45,18 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt @edu.umd.cs.findbugs.annotations.SuppressWarnings("MS_SHOULD_BE_FINAL") - public static FlowCatalog _globalFlowCatalog; + public static FlowConfigsResourceHandler global_flowConfigsResourceHandler = null; @Inject - @Named("flowCatalog") - private FlowCatalog _flowCatalog; + @Named("flowConfigsResourceHandler") + private FlowConfigsResourceHandler flowConfigsResourceHandler; // For blocking use of this resource until it is ready @Inject @Named("readyToUse") private Boolean readyToUse = Boolean.FALSE; - public FlowConfigsResource() {} - - /** - * Logs message and throws Rest.li exception - * @param status HTTP status code - * @param msg error message - * @param e exception - */ - public void logAndThrowRestLiServiceException(HttpStatus status, String msg, Exception e) { - if (e != null) { - LOG.error(msg, e); - throw new RestLiServiceException(status, msg + " cause = " + e.getMessage()); - } else { - LOG.error(msg); - throw new RestLiServiceException(status, msg); - } + public FlowConfigsResource() { } /** @@ -99,82 +68,8 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt public FlowConfig get(ComplexResourceKey<FlowId, EmptyRecord> key) { String flowGroup = key.getKey().getFlowGroup(); String flowName = key.getKey().getFlowName(); - - LOG.info("Get called with flowGroup " + flowGroup + " flowName " + flowName); - - try { - URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null); - URI flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(), - "/" + flowGroup + "/" + flowName, null, null); - FlowSpec spec = (FlowSpec) getFlowCatalog().getSpec(flowUri); - FlowConfig flowConfig = new FlowConfig(); - Properties flowProps = spec.getConfigAsProperties(); - Schedule schedule = null; - - if (flowProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) { - schedule = new Schedule(); - schedule.setCronSchedule(flowProps.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY)); - } - if (flowProps.containsKey(ConfigurationKeys.JOB_TEMPLATE_PATH)) { - flowConfig.setTemplateUris(flowProps.getProperty(ConfigurationKeys.JOB_TEMPLATE_PATH)); - } else if (spec.getTemplateURIs().isPresent()) { - flowConfig.setTemplateUris(StringUtils.join(spec.getTemplateURIs().get(), ",")); - } else { - flowConfig.setTemplateUris("NA"); - } - if (schedule != null) { - if (flowProps.containsKey(ConfigurationKeys.FLOW_RUN_IMMEDIATELY)) { - schedule.setRunImmediately(Boolean.valueOf(flowProps.getProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY))); - } - - flowConfig.setSchedule(schedule); - } - - // remove keys that were injected as part of flowSpec creation - flowProps.remove(ConfigurationKeys.JOB_SCHEDULE_KEY); - flowProps.remove(ConfigurationKeys.JOB_TEMPLATE_PATH); - - StringMap flowPropsAsStringMap = new StringMap(); - flowPropsAsStringMap.putAll(Maps.fromProperties(flowProps)); - - return flowConfig.setId(new FlowId().setFlowGroup(flowGroup).setFlowName(flowName)) - .setProperties(flowPropsAsStringMap); - } catch (URISyntaxException e) { - logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowName, e); - } catch (SpecNotFoundException e) { - logAndThrowRestLiServiceException(HttpStatus.S_404_NOT_FOUND, "Flow requested does not exist: " + flowName, null); - } - - return null; - } - - /** - * Build a {@link FlowSpec} from a {@link FlowConfig} - * @param flowConfig flow configuration - * @return {@link FlowSpec} created with attributes from flowConfig - */ - private FlowSpec createFlowSpecForConfig(FlowConfig flowConfig) { - ConfigBuilder configBuilder = ConfigBuilder.create() - .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, flowConfig.getId().getFlowGroup()) - .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, flowConfig.getId().getFlowName()); - - if (flowConfig.hasSchedule()) { - Schedule schedule = flowConfig.getSchedule(); - configBuilder.addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, schedule.getCronSchedule()); - configBuilder.addPrimitive(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, schedule.isRunImmediately()); - } - - Config config = configBuilder.build(); - Config configWithFallback = config.withFallback(ConfigFactory.parseMap(flowConfig.getProperties())); - - URI templateURI = null; - try { - templateURI = new URI(flowConfig.getTemplateUris()); - } catch (URISyntaxException e) { - logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowConfig.getTemplateUris(), e); - } - - return FlowSpec.builder().withConfig(configWithFallback).withTemplate(templateURI).build(); + FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName); + return this.getFlowConfigResourceHandler().getFlowConfig(flowId); } /** @@ -184,18 +79,7 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt */ @Override public CreateResponse create(FlowConfig flowConfig) { - LOG.info("Create called with flowName " + flowConfig.getId().getFlowName()); - - LOG.debug("ReadyToUse is: " + readyToUse); - LOG.debug("FlowCatalog is: " + getFlowCatalog()); - - if (!readyToUse && getFlowCatalog() == null) { - throw new RuntimeException("Not ready for use."); - } - - getFlowCatalog().put(createFlowSpecForConfig(flowConfig)); - - return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_201_CREATED); + return this.getFlowConfigResourceHandler().createFlowConfig(flowConfig); } /** @@ -209,46 +93,29 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt public UpdateResponse update(ComplexResourceKey<FlowId, EmptyRecord> key, FlowConfig flowConfig) { String flowGroup = key.getKey().getFlowGroup(); String flowName = key.getKey().getFlowName(); - - LOG.info("Update called with flowGroup " + flowGroup + " flowName " + flowName); - - if (!flowGroup.equals(flowConfig.getId().getFlowGroup()) || !flowName.equals(flowConfig.getId().getFlowName())) { - logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, - "flowName and flowGroup cannot be changed in update", null); - } - - getFlowCatalog().put(createFlowSpecForConfig(flowConfig)); - - return new UpdateResponse(HttpStatus.S_200_OK); + FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName); + return this.getFlowConfigResourceHandler().updateFlowConfig(flowId, flowConfig); } /** * Delete a configured flow. Running flows are not affected. The schedule will be removed for scheduled flows. - * @param key composite key containing flow group and flow name that identifies the flow to remove from the - * {@link FlowCatalog} + * @param key composite key containing flow group and flow name that identifies the flow to remove from the flow catalog * @return {@link UpdateResponse} */ @Override public UpdateResponse delete(ComplexResourceKey<FlowId, EmptyRecord> key) { String flowGroup = key.getKey().getFlowGroup(); String flowName = key.getKey().getFlowName(); - URI flowUri = null; - - LOG.info("Delete called with flowGroup " + flowGroup + " flowName " + flowName); - - try { - URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null); - flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(), - "/" + flowGroup + "/" + flowName, null, null); - - getFlowCatalog().remove(flowUri, getHeaders()); + FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName); + return this.getFlowConfigResourceHandler().deleteFlowConfig(flowId, getHeaders()); + } - return new UpdateResponse(HttpStatus.S_200_OK); - } catch (URISyntaxException e) { - logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowUri, e); + private FlowConfigsResourceHandler getFlowConfigResourceHandler() { + if (global_flowConfigsResourceHandler != null) { + return global_flowConfigsResourceHandler; } - return null; + return flowConfigsResourceHandler; } private Properties getHeaders() { @@ -260,18 +127,5 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt } return headerProperties; } - - /*** - * This method is to workaround injection issues where Service has only one active global FlowCatalog - * .. and is not able to inject it via RestLI bootstrap. We should remove this and make injected - * .. FlowCatalog standard after injection works and recipe is documented here. - * @return FlowCatalog in use. - */ - private FlowCatalog getFlowCatalog() { - if (null != _globalFlowCatalog) { - return _globalFlowCatalog; - } - return this._flowCatalog; - } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java new file mode 100644 index 0000000..b92ab9d --- /dev/null +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service; + +import java.util.Properties; + +import com.linkedin.restli.server.CreateResponse; +import com.linkedin.restli.server.UpdateResponse; + + +public interface FlowConfigsResourceHandler { + /** + * Get flow config + */ + FlowConfig getFlowConfig(FlowId flowId) throws FlowConfigLoggedException; + + /** + * Add flow config can be done locally only iff current node is a master + */ + CreateResponse createFlowConfig(FlowConfig flowConfig) throws FlowConfigLoggedException; + + /** + * Update flow config can be done locally only iff current node is a master + */ + UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig) throws FlowConfigLoggedException; + + /** + * Delete flow config can be done locally only iff current node is a master + */ + UpdateResponse deleteFlowConfig(FlowId flowId, Properties header) throws FlowConfigLoggedException; + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java index f9ae420..3422d66 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java @@ -25,25 +25,25 @@ import java.util.Collections; import java.util.List; import java.util.Properties; -import javax.annotation.Nonnull; - import org.apache.commons.lang3.SerializationUtils; import org.apache.commons.lang3.reflect.ConstructorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Optional; -import com.google.common.util.concurrent.AbstractIdleService; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractIdleService; import com.typesafe.config.Config; +import javax.annotation.Nonnull; + +import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.Instrumented; -import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment; -import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment; import org.apache.gobblin.runtime.api.MutableSpecCatalog; import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.runtime.api.SpecCatalog; @@ -91,8 +91,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut this.metricContext = realParentCtx.childBuilder(FlowCatalog.class.getSimpleName()).build(); this.metrics = new MutableStandardMetrics(this, Optional.of(config)); this.addListener(this.metrics); - } - else { + } else { this.metricContext = null; this.metrics = null; } @@ -240,8 +239,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut } } - @Override - public void put(Spec spec) { + public void put(Spec spec, boolean triggerListener) { try { Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName())); Preconditions.checkNotNull(spec); @@ -251,18 +249,29 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut ((FlowSpec) spec).getConfigAsProperties())); specStore.addSpec(spec); metrics.updatePutSpecTime(startTime); - this.listeners.onAddSpec(spec); + if (triggerListener) { + this.listeners.onAddSpec(spec); + } } catch (IOException e) { throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e); } } + @Override + public void put(Spec spec) { + put(spec, true); + } + public void remove(URI uri) { remove(uri, new Properties()); } @Override public void remove(URI uri, Properties headers) { + this.remove(uri, headers, true); + } + + public void remove(URI uri, Properties headers, boolean triggerListener) { try { Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName())); Preconditions.checkNotNull(uri); @@ -270,8 +279,9 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut log.info(String.format("Removing FlowSpec with URI: %s", uri)); specStore.deleteSpec(uri); this.metrics.updateRemoveSpecTime(startTime); - this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION, headers); - + if (triggerListener) { + this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION, headers); + } } catch (IOException e) { throw new RuntimeException("Cannot delete Spec from Spec store for URI: " + uri, e); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java new file mode 100644 index 0000000..d02d861 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.core; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import org.apache.helix.NotificationContext; +import org.apache.helix.messaging.handling.HelixTaskResult; +import org.apache.helix.messaging.handling.MessageHandler; +import org.apache.helix.messaging.handling.MessageHandlerFactory; +import org.apache.helix.model.Message; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.service.FlowConfig; +import org.apache.gobblin.service.FlowConfigResourceLocalHandler; +import org.apache.gobblin.service.FlowId; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.service.modules.restli.FlowConfigUtils; +import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler; +import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler; + +/** + * A custom {@link MessageHandlerFactory} for {@link org.apache.gobblin.service.modules.core.ControllerUserDefinedMessageHandlerFactory}s that + * handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}. + */ +@AllArgsConstructor +class ControllerUserDefinedMessageHandlerFactory implements MessageHandlerFactory { + private boolean flowCatalogLocalCommit; + private GobblinServiceJobScheduler jobScheduler; + private GobblinServiceFlowConfigResourceHandler resourceHandler; + private String serviceName; + + @Override + public MessageHandler createHandler(Message message, NotificationContext context) { + return new ControllerUserDefinedMessageHandler(message, context, serviceName, flowCatalogLocalCommit, jobScheduler, resourceHandler); + } + + @Override + public String getMessageType() { + return Message.MessageType.USER_DEFINE_MSG.toString(); + } + + public List<String> getMessageTypes() { + return Collections.singletonList(getMessageType()); + } + + @Override + public void reset() { + + } + + /** + * A custom {@link MessageHandler} for handling user-defined messages to the controller. + */ + @Slf4j + private static class ControllerUserDefinedMessageHandler extends MessageHandler { + private boolean flowCatalogLocalCommit; + private GobblinServiceJobScheduler jobScheduler; + private GobblinServiceFlowConfigResourceHandler resourceHandler; + private String serviceName; + + public ControllerUserDefinedMessageHandler(Message message, NotificationContext context, String serviceName, + boolean flowCatalogLocalCommit, GobblinServiceJobScheduler scheduler, + GobblinServiceFlowConfigResourceHandler resourceHandler) { + super(message, context); + this.serviceName = serviceName; + this.flowCatalogLocalCommit = flowCatalogLocalCommit; + this.jobScheduler = scheduler; + this.resourceHandler = resourceHandler; + } + + /** + * Method to handle add flow config message forwarded by Helix (Standby) node. + * In load balance mode, the FlowCatalog I/O was handled on standby when receiving Restli, so only need to handle + * {@link org.apache.gobblin.runtime.api.SpecCatalogListener#onAddSpec(Spec)} part. + * Otherwise, we have to handle both FlowCatalog I/O and {@link org.apache.gobblin.runtime.api.SpecCatalogListener#onAddSpec(Spec)}. + * + * Please refer to {@link FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)}. It will handle both FlowCatalog I/O and + * {@link org.apache.gobblin.runtime.api.SpecCatalogListener#onAddSpec(Spec)} in non-balance mode. + */ + private void handleAdd(String msg) + throws IOException { + FlowConfig config = FlowConfigUtils.deserializeFlowConfig(msg); + if (this.flowCatalogLocalCommit) { + // in balance mode, flow spec is already added in flow catalog on standby node. + FlowSpec flowSpec = FlowConfigResourceLocalHandler.createFlowSpecForConfig(config); + log.info("Only handle add {} scheduling because flow catalog is committed locally on standby.", flowSpec); + jobScheduler.onAddSpec(flowSpec); + } else { + resourceHandler.createFlowConfig(config); + } + } + + /** + * Method to handle add flow config message forwarded by Helix (Standby) node. + * In load balance mode, the FlowCatalog I/O was handled on standby when receiving Restli, so only need to handle + * {@link org.apache.gobblin.runtime.api.SpecCatalogListener#onUpdateSpec(Spec)} part. + * Otherwise, we have to handle both FlowCatalog I/O and {@link org.apache.gobblin.runtime.api.SpecCatalogListener#onUpdateSpec(Spec)}. + * + * Please refer to {@link FlowConfigResourceLocalHandler#updateFlowConfig(FlowId, FlowConfig)}. It will handle both FlowCatalog I/O and + * {@link org.apache.gobblin.runtime.api.SpecCatalogListener#onUpdateSpec(Spec)} in non-balance mode. + */ + private void handleUpdate(String msg) + throws IOException { + FlowConfig config = FlowConfigUtils.deserializeFlowConfig(msg); + if (flowCatalogLocalCommit) { + // in balance mode, flow spec is already updated in flow catalog on standby node. + FlowSpec flowSpec = FlowConfigResourceLocalHandler.createFlowSpecForConfig(config); + log.info("Only handle update {} scheduling because flow catalog is committed locally on standby.", flowSpec); + jobScheduler.onUpdateSpec(flowSpec); + } else { + resourceHandler.updateFlowConfig(config.getId(), config); + } + } + + /** + * Method to handle add flow config message forwarded by Helix (Standby) node. + * In load balance mode, the FlowCatalog I/O was handled on standby when receiving Restli, so only need to handle + * {@link org.apache.gobblin.runtime.api.SpecCatalogListener#onDeleteSpec(URI, String, Properties)} part. + * Otherwise, we have to handle both FlowCatalog I/O and {@link org.apache.gobblin.runtime.api.SpecCatalogListener#onDeleteSpec(URI, String, Properties)}. + * + * Please refer to {@link FlowConfigResourceLocalHandler#deleteFlowConfig(FlowId, Properties)}. It will handle both FlowCatalog I/O and + * {@link org.apache.gobblin.runtime.api.SpecCatalogListener#onDeleteSpec(URI, String, Properties)} in non-balance mode. + */ + private void handleDelete(String msg) + throws IOException { + try { + FlowId id = FlowConfigUtils.deserializeFlowId(msg); + if (flowCatalogLocalCommit) { + // in balance mode, flow spec is already deleted in flow catalog on standby node. + URI flowUri = FlowConfigResourceLocalHandler.createFlowSpecUri(id); + log.info("Only handle update {} scheduling because flow catalog is committed locally on standby.", flowUri); + jobScheduler.onDeleteSpec(flowUri, FlowSpec.Builder.DEFAULT_VERSION); + } else { + resourceHandler.deleteFlowConfig(id, new Properties()); + } + } catch (URISyntaxException e) { + throw new IOException(e); + } + } + + @Override + public HelixTaskResult handleMessage() + throws InterruptedException { + if (jobScheduler.isActive()) { + // we want to make sure current node is in active state + String msg = _message.getAttribute(Message.Attributes.INNER_MESSAGE); + log.info("{} ControllerUserDefinedMessage received : {}, type {}", this.serviceName, msg, _message.getMsgSubType()); + try { + if (_message.getMsgSubType().equals(ServiceConfigKeys.HELIX_FLOWSPEC_ADD)) { + handleAdd(msg); + } else if (_message.getMsgSubType().equals(ServiceConfigKeys.HELIX_FLOWSPEC_REMOVE)) { + handleDelete(msg); + } else if (_message.getMsgSubType().equals(ServiceConfigKeys.HELIX_FLOWSPEC_UPDATE)) { + handleUpdate(msg); + } + } catch (IOException e) { + log.error("Cannot process Helix message.", e); + HelixTaskResult helixTaskResult = new HelixTaskResult(); + helixTaskResult.setSuccess(false); + return helixTaskResult; + } + } else { + String msg = _message.getAttribute(Message.Attributes.INNER_MESSAGE); + log.error("ControllerUserDefinedMessage received but ignored due to not in active mode: {}, type {}", msg, + _message.getMsgSubType()); + } + HelixTaskResult helixTaskResult = new HelixTaskResult(); + helixTaskResult.setSuccess(true); + + return helixTaskResult; + } + + @Override + public void onError(Exception e, ErrorCode code, ErrorType type) { + log.error( + String.format("Failed to handle message with exception %s, error code %s, error type %s", e, code, type)); + } + } +} + + http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java index b20e3b7..09d7bb4 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java @@ -25,8 +25,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.gobblin.runtime.api.SpecNotFoundException; -import org.apache.gobblin.runtime.spec_store.FSSpecStore; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -50,16 +48,17 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.config.ConfigBuilder; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; +import org.apache.gobblin.runtime.spec_store.FSSpecStore; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.PullFileLoader; -import lombok.extern.slf4j.Slf4j; - /** * Service that monitors for jobs from a git repository. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java index 3137c21..0daea79 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java @@ -17,29 +17,14 @@ package org.apache.gobblin.service.modules.core; -import org.apache.gobblin.configuration.State; -import org.apache.gobblin.instrumented.Instrumented; -import org.apache.gobblin.instrumented.StandardMetricsBridge; -import org.apache.gobblin.metrics.ContextAwareHistogram; -import org.apache.gobblin.metrics.ContextAwareMetric; -import org.apache.gobblin.metrics.MetricContext; -import org.apache.gobblin.metrics.Tag; -import org.apache.gobblin.service.FlowId; -import org.apache.gobblin.service.Schedule; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.net.URI; -import java.net.URISyntaxException; import java.util.Collection; -import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; -import lombok.Getter; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; @@ -52,21 +37,16 @@ import org.apache.hadoop.fs.Path; import org.apache.helix.ControllerChangeListener; import org.apache.helix.HelixManager; import org.apache.helix.NotificationContext; -import org.apache.helix.messaging.handling.HelixTaskResult; -import org.apache.helix.messaging.handling.MessageHandler; -import org.apache.helix.messaging.handling.MessageHandlerFactory; import org.apache.helix.model.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; -import com.google.common.base.Splitter; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.eventbus.EventBus; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.eventbus.EventBus; import com.google.inject.Binder; import com.google.inject.Guice; import com.google.inject.Injector; @@ -78,32 +58,42 @@ import com.linkedin.restli.server.resources.BaseResource; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import javax.annotation.Nonnull; +import lombok.Getter; + import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.instrumented.StandardMetricsBridge; +import org.apache.gobblin.metrics.ContextAwareHistogram; +import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.restli.EmbeddedRestliServer; import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.runtime.app.ApplicationException; import org.apache.gobblin.runtime.app.ApplicationLauncher; import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher; -import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; +import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog; import org.apache.gobblin.scheduler.SchedulerService; import org.apache.gobblin.service.FlowConfig; import org.apache.gobblin.service.FlowConfigClient; +import org.apache.gobblin.service.FlowConfigResourceLocalHandler; import org.apache.gobblin.service.FlowConfigsResource; -import org.apache.gobblin.service.modules.utils.HelixUtils; +import org.apache.gobblin.service.FlowConfigsResourceHandler; +import org.apache.gobblin.service.FlowId; +import org.apache.gobblin.service.Schedule; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.orchestration.Orchestrator; +import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler; import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler; import org.apache.gobblin.service.modules.topology.TopologySpecFactory; -import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog; +import org.apache.gobblin.service.modules.utils.HelixUtils; import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; @Alpha -public class GobblinServiceManager implements ApplicationLauncher, StandardMetricsBridge{ +public class GobblinServiceManager implements ApplicationLauncher, StandardMetricsBridge { private static final Logger LOGGER = LoggerFactory.getLogger(GobblinServiceManager.class); @@ -115,7 +105,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri protected final FileSystem fs; protected final Path serviceWorkDir; - + protected final String serviceName; protected final String serviceId; protected final boolean isTopologyCatalogEnabled; @@ -130,6 +120,10 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri protected FlowCatalog flowCatalog; @Getter protected GobblinServiceJobScheduler scheduler; + @Getter + protected GobblinServiceFlowConfigResourceHandler resourceHandler; + + protected boolean flowCatalogLocalCommit; protected Orchestrator orchestrator; protected EmbeddedRestliServer restliServer; protected TopologySpecFactory topologySpecFactory; @@ -142,7 +136,6 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri @Getter protected Config config; - private final MetricContext metricContext; private final Metrics metrics; @@ -157,6 +150,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri this.config = config; this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass()); this.metrics = new Metrics(this.metricContext, this.config); + this.serviceName = serviceName; this.serviceId = serviceId; this.serviceLauncher = new ServiceBasedAppLauncher(properties, serviceName); @@ -177,6 +171,8 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri ServiceConfigKeys.GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY, true); if (isFlowCatalogEnabled) { this.flowCatalog = new FlowCatalog(config, Optional.of(LOGGER)); + this.flowCatalogLocalCommit = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT, + ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT); this.serviceLauncher.addService(flowCatalog); this.isGitConfigMonitorEnabled = ConfigUtils.getBoolean(config, @@ -208,7 +204,8 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri if (isSchedulerEnabled) { this.orchestrator = new Orchestrator(config, Optional.of(this.topologyCatalog), Optional.of(LOGGER)); SchedulerService schedulerService = new SchedulerService(properties); - this.scheduler = new GobblinServiceJobScheduler(config, this.helixManager, + + this.scheduler = new GobblinServiceJobScheduler(this.serviceName, config, this.helixManager, Optional.of(this.flowCatalog), Optional.of(this.topologyCatalog), this.orchestrator, schedulerService, Optional.of(LOGGER)); this.serviceLauncher.addService(schedulerService); @@ -216,13 +213,19 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri } // Initialize RestLI + this.resourceHandler = new GobblinServiceFlowConfigResourceHandler(serviceName, + this.flowCatalogLocalCommit, + new FlowConfigResourceLocalHandler(this.flowCatalog), + this.helixManager, + this.scheduler); + this.isRestLIServerEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY, true); if (isRestLIServerEnabled) { Injector injector = Guice.createInjector(new Module() { @Override public void configure(Binder binder) { - binder.bind(FlowCatalog.class).annotatedWith(Names.named("flowCatalog")).toInstance(flowCatalog); + binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsResourceHandler")).toInstance(GobblinServiceManager.this.resourceHandler); binder.bindConstant().annotatedWith(Names.named("readyToUse")).to(Boolean.TRUE); } }); @@ -259,6 +262,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri } } + @VisibleForTesting public boolean isLeader() { // If helix manager is absent, then this standalone instance hence leader // .. else check if this master of cluster @@ -411,7 +415,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri this.helixManager.get() .getMessagingService() .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), - getUserDefinedMessageHandlerFactory()); + new ControllerUserDefinedMessageHandlerFactory(flowCatalogLocalCommit, scheduler, resourceHandler, serviceName)); } } catch (Exception e) { LOGGER.error("HelixManager failed to connect", e); @@ -419,16 +423,6 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri } } - /** - * Creates and returns a {@link MessageHandlerFactory} for handling of Helix - * {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}s. - * - * @returns a {@link MessageHandlerFactory}. - */ - protected MessageHandlerFactory getUserDefinedMessageHandlerFactory() { - return new ControllerUserDefinedMessageHandlerFactory(this.flowCatalog, this.scheduler); - } - @VisibleForTesting void disconnectHelixManager() { if (isHelixManagerConnected()) { @@ -475,92 +469,6 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri } } - /** - * A custom {@link MessageHandlerFactory} for {@link ControllerUserDefinedMessageHandler}s that - * handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}. - */ - private static class ControllerUserDefinedMessageHandlerFactory implements MessageHandlerFactory { - - private FlowCatalog flowCatalog; - private GobblinServiceJobScheduler jobScheduler; - - public ControllerUserDefinedMessageHandlerFactory(FlowCatalog flowCatalog, GobblinServiceJobScheduler jobScheduler) { - this.flowCatalog = flowCatalog; - this.jobScheduler = jobScheduler; - } - - @Override - public MessageHandler createHandler(Message message, NotificationContext context) { - return new ControllerUserDefinedMessageHandler(flowCatalog, jobScheduler, message, context); - } - - @Override - public String getMessageType() { - return Message.MessageType.USER_DEFINE_MSG.toString(); - } - - public List<String> getMessageTypes() { - return Collections.singletonList(getMessageType()); - } - - @Override - public void reset() { - - } - - /** - * A custom {@link MessageHandler} for handling user-defined messages to the controller. - */ - private static class ControllerUserDefinedMessageHandler extends MessageHandler { - - private FlowCatalog flowCatalog; - private GobblinServiceJobScheduler jobScheduler; - - public ControllerUserDefinedMessageHandler(FlowCatalog flowCatalog, GobblinServiceJobScheduler jobScheduler, - Message message, NotificationContext context) { - super(message, context); - - this.flowCatalog = flowCatalog; - this.jobScheduler = jobScheduler; - } - - @Override - public HelixTaskResult handleMessage() throws InterruptedException { - if (jobScheduler.isActive()) { - String flowSpecUri = _message.getAttribute(Message.Attributes.INNER_MESSAGE); - LOGGER.info ("ControllerUserDefinedMessage received : {}, type {}", flowSpecUri, _message.getMsgSubType()); - try { - if (_message.getMsgSubType().equals(ServiceConfigKeys.HELIX_FLOWSPEC_ADD)) { - Spec spec = flowCatalog.getSpec(new URI(flowSpecUri)); - this.jobScheduler.onAddSpec(spec); - } else if (_message.getMsgSubType().equals(ServiceConfigKeys.HELIX_FLOWSPEC_REMOVE)) { - List<String> flowSpecUriParts = Splitter.on(":").omitEmptyStrings().trimResults().splitToList(flowSpecUri); - this.jobScheduler.onDeleteSpec(new URI(flowSpecUriParts.get(0)), flowSpecUriParts.get(1)); - } else if (_message.getMsgSubType().equals(ServiceConfigKeys.HELIX_FLOWSPEC_UPDATE)) { - Spec spec = flowCatalog.getSpec(new URI(flowSpecUri)); - this.jobScheduler.onUpdateSpec(spec); - } - } catch (SpecNotFoundException | URISyntaxException e) { - LOGGER.error("Cannot process Helix message for flowSpecUri: " + flowSpecUri, e); - } - } else { - String flowSpecUri = _message.getAttribute(Message.Attributes.INNER_MESSAGE); - LOGGER.info ("ControllerUserDefinedMessage received but ignored due to not in active mode: {}, type {}", flowSpecUri, _message.getMsgSubType()); - } - HelixTaskResult helixTaskResult = new HelixTaskResult(); - helixTaskResult.setSuccess(true); - - return helixTaskResult; - } - - @Override - public void onError(Exception e, ErrorCode code, ErrorType type) { - LOGGER.error( - String.format("Failed to handle message with exception %s, error code %s, error type %s", e, code, type)); - } - } - } - private static String getServiceId() { return "1"; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java new file mode 100644 index 0000000..a59fc85 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.restli; + +import java.io.IOException; +import java.util.Properties; + +import com.google.common.collect.Maps; +import com.linkedin.data.template.StringMap; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.service.FlowConfig; +import org.apache.gobblin.service.FlowId; +import org.apache.gobblin.service.Schedule; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.PropertiesUtils; + + +public class FlowConfigUtils { + private static final String FLOWCONFIG = "fc"; + private static final String FLOWCONFIG_ID = FLOWCONFIG + '-' + "id"; + private static final String FLOWCONFIG_ID_NAME = FLOWCONFIG_ID + '-' + "name"; + private static final String FLOWCONFIG_ID_GROUP = FLOWCONFIG_ID + '-' + "group"; + + private static final String FLOWCONFIG_SCHEDULE = FLOWCONFIG + '-' + "sch"; + private static final String FLOWCONFIG_SCHEDULE_CRON = FLOWCONFIG_SCHEDULE + '-' + "cron"; + private static final String FLOWCONFIG_SCHEDULE_RUN_IMMEDIATELY = FLOWCONFIG_SCHEDULE + '-' + "runImmediately"; + + private static final String FLOWCONFIG_TEMPLATEURIS = FLOWCONFIG + '-' + "templateUris"; + + public static String serializeFlowId(FlowId id) throws IOException { + Properties properties = new Properties(); + properties.setProperty(FLOWCONFIG_ID_NAME, id.getFlowName()); + properties.setProperty(FLOWCONFIG_ID_GROUP, id.getFlowGroup()); + + return PropertiesUtils.serialize(properties); + } + + public static FlowId deserializeFlowId(String serialized) throws IOException { + Properties properties = PropertiesUtils.deserialize(serialized); + FlowId id = new FlowId(); + id.setFlowName(properties.getProperty(FLOWCONFIG_ID_NAME)); + id.setFlowGroup(properties.getProperty(FLOWCONFIG_ID_GROUP)); + return id; + } + + public static String serializeFlowConfig(FlowConfig flowConfig) throws IOException { + Properties properties = ConfigUtils.configToProperties(ConfigFactory.parseMap(flowConfig.getProperties())); + properties.setProperty(FLOWCONFIG_ID_NAME, flowConfig.getId().getFlowName()); + properties.setProperty(FLOWCONFIG_ID_GROUP, flowConfig.getId().getFlowGroup()); + + if (flowConfig.hasSchedule()) { + properties.setProperty(FLOWCONFIG_SCHEDULE_CRON, flowConfig.getSchedule().getCronSchedule()); + properties.setProperty(FLOWCONFIG_SCHEDULE_RUN_IMMEDIATELY, Boolean.toString(flowConfig.getSchedule().isRunImmediately())); + } + + if (flowConfig.hasTemplateUris()) { + properties.setProperty(FLOWCONFIG_TEMPLATEURIS, flowConfig.getTemplateUris()); + } + + return PropertiesUtils.serialize(properties); + } + + public static FlowConfig deserializeFlowConfig(String serialized) throws IOException { + Properties properties = PropertiesUtils.deserialize(serialized); + FlowConfig flowConfig = new FlowConfig().setId(new FlowId() + .setFlowName(properties.getProperty(FLOWCONFIG_ID_NAME)) + .setFlowGroup(properties.getProperty(FLOWCONFIG_ID_GROUP))); + + if (properties.containsKey(FLOWCONFIG_SCHEDULE_CRON)) { + flowConfig.setSchedule(new Schedule() + .setCronSchedule(properties.getProperty(FLOWCONFIG_SCHEDULE_CRON)) + .setRunImmediately(Boolean.valueOf(properties.getProperty(FLOWCONFIG_SCHEDULE_RUN_IMMEDIATELY)))); + } + + if (properties.containsKey(FLOWCONFIG_TEMPLATEURIS)) { + flowConfig.setTemplateUris(properties.getProperty(FLOWCONFIG_TEMPLATEURIS)); + } + + properties.remove(FLOWCONFIG_ID_NAME); + properties.remove(FLOWCONFIG_ID_GROUP); + properties.remove(FLOWCONFIG_SCHEDULE_CRON); + properties.remove(FLOWCONFIG_SCHEDULE_RUN_IMMEDIATELY); + properties.remove(FLOWCONFIG_TEMPLATEURIS); + + flowConfig.setProperties(new StringMap(Maps.fromProperties(properties))); + + return flowConfig; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java new file mode 100644 index 0000000..55d9cf0 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.restli; + +import java.io.IOException; +import java.util.Properties; +import java.util.UUID; + +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; + +import com.google.common.base.Optional; +import com.linkedin.restli.common.ComplexResourceKey; +import com.linkedin.restli.common.EmptyRecord; +import com.linkedin.restli.common.HttpStatus; +import com.linkedin.restli.server.CreateResponse; +import com.linkedin.restli.server.UpdateResponse; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.service.FlowConfig; +import org.apache.gobblin.service.FlowConfigLoggedException; +import org.apache.gobblin.service.FlowConfigResourceLocalHandler; +import org.apache.gobblin.service.FlowConfigsResourceHandler; +import org.apache.gobblin.service.FlowId; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler; +import org.apache.gobblin.service.modules.utils.HelixUtils; + + +/** + * A high available flow config resource handler which consider the leadership change. + * When a non-master status detected, it will forward the rest-li request + * to the master node. Otherwise it will handle it locally. + */ +@Slf4j +public class GobblinServiceFlowConfigResourceHandler implements FlowConfigsResourceHandler { + @Getter + private String serviceName; + private boolean flowCatalogLocalCommit; + private FlowConfigResourceLocalHandler localHandler; + private Optional<HelixManager> helixManager; + private GobblinServiceJobScheduler jobScheduler; + + public GobblinServiceFlowConfigResourceHandler(String serviceName, boolean flowCatalogLocalCommit, + FlowConfigResourceLocalHandler handler, + Optional<HelixManager> manager, + GobblinServiceJobScheduler jobScheduler) { + this.flowCatalogLocalCommit = flowCatalogLocalCommit; + this.serviceName = serviceName; + this.localHandler = handler; + this.helixManager = manager; + this.jobScheduler = jobScheduler; + } + + @Override + public FlowConfig getFlowConfig(FlowId flowId) + throws FlowConfigLoggedException { + return this.localHandler.getFlowConfig(flowId); + } + + /** + * Method to handle create Restli request. + * In load balance mode, we will handle flowCatalog I/O locally before forwarding the message to Helix (Active) node. + * Please refer to {@link FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)}. It only handle flowCatalog I/O. + * + * The listeners of {@link org.apache.gobblin.runtime.spec_catalog.FlowCatalog} won't be triggered in balance mode. + */ + @Override + public CreateResponse createFlowConfig(FlowConfig flowConfig) + throws FlowConfigLoggedException { + String flowName = flowConfig.getId().getFlowName(); + String flowGroup = flowConfig.getId().getFlowGroup(); + + checkHelixConnection(ServiceConfigKeys.HELIX_FLOWSPEC_ADD, flowName, flowGroup); + + try { + if (!jobScheduler.isActive() && helixManager.isPresent()) { + if (this.flowCatalogLocalCommit) { + // We will handle FS I/O locally for load balance before forwarding to remote node. + this.localHandler.createFlowConfig(flowConfig, false); + } + + forwardMessage(ServiceConfigKeys.HELIX_FLOWSPEC_ADD, FlowConfigUtils.serializeFlowConfig(flowConfig), flowName, flowGroup); + + // Do actual work on remote node, directly return success + return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_201_CREATED); + } else { + return this.localHandler.createFlowConfig(flowConfig); + } + } catch (IOException e) { + throw new FlowConfigLoggedException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, + "Cannot create flowConfig [flowName=" + flowName + " flowGroup=" + flowGroup + "]", e); + } + } + + /** + * Method to handle update Restli request. + * In load balance mode, we will handle flowCatalog I/O locally before forwarding the message to Helix (Active) node. + * Please refer to {@link FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)}. It only handle flowCatalog I/O. + * + * The listeners of {@link org.apache.gobblin.runtime.spec_catalog.FlowCatalog} won't be triggered in balance mode. + */ + @Override + public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig) + throws FlowConfigLoggedException { + String flowName = flowId.getFlowName(); + String flowGroup = flowId.getFlowGroup(); + + if (!flowGroup.equals(flowConfig.getId().getFlowGroup()) || !flowName.equals(flowConfig.getId().getFlowName())) { + throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, + "flowName and flowGroup cannot be changed in update", null); + } + + checkHelixConnection(ServiceConfigKeys.HELIX_FLOWSPEC_UPDATE, flowName, flowGroup); + + try { + if (!jobScheduler.isActive() && helixManager.isPresent()) { + + if (this.flowCatalogLocalCommit) { + // We will handle FS I/O locally for load balance before forwarding to remote node. + this.localHandler.updateFlowConfig(flowId, flowConfig, false); + } + + forwardMessage(ServiceConfigKeys.HELIX_FLOWSPEC_UPDATE, FlowConfigUtils.serializeFlowConfig(flowConfig), flowName, flowGroup); + + // Do actual work on remote node, directly return success + log.info("Forwarding update flowConfig [flowName=" + flowName + " flowGroup=" + flowGroup + "]"); + return new UpdateResponse(HttpStatus.S_200_OK); + } else { + return this.localHandler.updateFlowConfig(flowId, flowConfig); + } + + } catch (IOException e) { + throw new FlowConfigLoggedException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, + "Cannot update flowConfig [flowName=" + flowName + " flowGroup=" + flowGroup + "]", e); + } + } + + /** + * Method to handle delete Restli request. + * In load balance mode, we will handle flowCatalog I/O locally before forwarding the message to Helix (Active) node. + * Please refer to {@link FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)}. It only handle flowCatalog I/O. + * + * The listeners of {@link org.apache.gobblin.runtime.spec_catalog.FlowCatalog} won't be triggered in balance mode. + */ + @Override + public UpdateResponse deleteFlowConfig(FlowId flowId, Properties header) + throws FlowConfigLoggedException { + String flowName = flowId.getFlowName(); + String flowGroup = flowId.getFlowGroup(); + + checkHelixConnection(ServiceConfigKeys.HELIX_FLOWSPEC_REMOVE, flowName, flowGroup); + + try { + if (!jobScheduler.isActive() && helixManager.isPresent()) { + + if (this.flowCatalogLocalCommit) { + // We will handle FS I/O locally for load balance before forwarding to remote node. + this.localHandler.deleteFlowConfig(flowId, header, false); + } + + forwardMessage(ServiceConfigKeys.HELIX_FLOWSPEC_REMOVE, FlowConfigUtils.serializeFlowId(flowId), flowName, flowGroup); + + return new UpdateResponse(HttpStatus.S_200_OK); + } else { + return this.localHandler.deleteFlowConfig(flowId, header); + } + } catch (IOException e) { + throw new FlowConfigLoggedException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, + "Cannot delete flowConfig [flowName=" + flowName + " flowGroup=" + flowGroup + "]", e); + } + } + + private void checkHelixConnection(String opr, String flowName, String flowGroup) throws FlowConfigLoggedException { + if (this.helixManager.isPresent() && !this.helixManager.get().isConnected()) { + // Specs in store will be notified when Scheduler is added as listener to FlowCatalog, so ignore + // .. Specs if in cluster mode and Helix is not yet initialized + log.warn("System not yet initialized. Skipping operation " + opr); + throw new FlowConfigLoggedException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, + "System not yet initialized. Skipping " + opr + " flowConfig [flowName=" + flowName + " flowGroup=" + flowGroup + "]"); + } + } + + private void forwardMessage(String msgSubType, String val, String flowName, String flowGroup) { + HelixUtils.sendUserDefinedMessage(msgSubType, val, UUID.randomUUID().toString(), InstanceType.CONTROLLER, + helixManager.get(), log); + log.info("{} Forwarding {} flowConfig [flowName={} flowGroup={}", serviceName, msgSubType, flowName, flowGroup + "]"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index 328c742..506a53e 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -22,14 +22,8 @@ import java.util.Collection; import java.util.Map; import java.util.Properties; -import java.util.UUID; - -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - import org.apache.commons.lang.StringUtils; import org.apache.helix.HelixManager; -import org.apache.helix.InstanceType; import org.quartz.DisallowConcurrentExecution; import org.quartz.InterruptableJob; import org.quartz.JobDataMap; @@ -45,6 +39,9 @@ import com.google.common.collect.Maps; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.JobException; @@ -57,7 +54,6 @@ import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog; import org.apache.gobblin.scheduler.BaseGobblinJob; import org.apache.gobblin.scheduler.JobScheduler; import org.apache.gobblin.scheduler.SchedulerService; -import org.apache.gobblin.service.modules.utils.HelixUtils; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.orchestration.Orchestrator; import org.apache.gobblin.util.ConfigUtils; @@ -78,29 +74,29 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata protected final Orchestrator orchestrator; @Getter protected final Map<String, Spec> scheduledFlowSpecs; - @Getter - protected volatile boolean isActive; + private volatile boolean isActive; + private String serviceName; - public GobblinServiceJobScheduler(Config config, Optional<HelixManager> helixManager, + public GobblinServiceJobScheduler(String serviceName, Config config, Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Orchestrator orchestrator, SchedulerService schedulerService, Optional<Logger> log) throws Exception { super(ConfigUtils.configToProperties(config), schedulerService); _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); - + this.serviceName = serviceName; this.flowCatalog = flowCatalog; this.helixManager = helixManager; this.orchestrator = orchestrator; this.scheduledFlowSpecs = Maps.newHashMap(); } - public GobblinServiceJobScheduler(Config config, Optional<HelixManager> helixManager, + public GobblinServiceJobScheduler(String serviceName, Config config, Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, SchedulerService schedulerService, Optional<Logger> log) throws Exception { - this(config, helixManager, flowCatalog, topologyCatalog, new Orchestrator(config, topologyCatalog, log), + this(serviceName, config, helixManager, flowCatalog, topologyCatalog, new Orchestrator(config, topologyCatalog, log), schedulerService, log); } @@ -196,13 +192,6 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata _log.info("New Flow Spec detected: " + addedSpec); if (addedSpec instanceof FlowSpec) { - if (!isActive && helixManager.isPresent()) { - _log.info("Scheduler running in slave mode, forward Spec add via Helix message to master: " + addedSpec); - HelixUtils.sendUserDefinedMessage(ServiceConfigKeys.HELIX_FLOWSPEC_ADD, addedSpec.getUri().toString(), - UUID.randomUUID().toString(), InstanceType.CONTROLLER, helixManager.get(), _log); - return; - } - try { Properties jobConfig = new Properties(); Properties flowSpecProperties = ((FlowSpec) addedSpec).getConfigAsProperties(); @@ -221,7 +210,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata this.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec); if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) { - _log.info("Scheduling flow spec: " + addedSpec); + _log.info("{} Scheduling flow spec: {} ", this.serviceName, addedSpec); scheduleJob(jobConfig, null); if (PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) { _log.info("RunImmediately requested, hence executing FlowSpec: " + addedSpec); @@ -232,7 +221,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata this.jobExecutor.execute(new NonScheduledJobRunner(jobConfig, null)); } } catch (JobException je) { - _log.error("Failed to schedule or run FlowSpec " + addedSpec, je); + _log.error("{} Failed to schedule or run FlowSpec {}", serviceName, addedSpec, je); } } } @@ -252,14 +241,6 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata } _log.info("Spec deletion detected: " + deletedSpecURI + "/" + deletedSpecVersion); - if (!isActive && helixManager.isPresent()) { - _log.info("Scheduler running in slave mode, forward Spec delete via Helix message to master: " + deletedSpecURI); - HelixUtils.sendUserDefinedMessage(ServiceConfigKeys.HELIX_FLOWSPEC_REMOVE, - deletedSpecURI.toString() + ":" + deletedSpecVersion, UUID.randomUUID().toString(), InstanceType.CONTROLLER, - helixManager.get(), _log); - return; - } - try { Spec deletedSpec = this.scheduledFlowSpecs.get(deletedSpecURI.toString()); if (null != deletedSpec) { @@ -292,13 +273,6 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata return; } - if (!isActive && helixManager.isPresent()) { - _log.info("Scheduler running in slave mode, forward Spec update via Helix message to master: " + updatedSpec); - HelixUtils.sendUserDefinedMessage(ServiceConfigKeys.HELIX_FLOWSPEC_UPDATE, updatedSpec.getUri().toString(), - UUID.randomUUID().toString(), InstanceType.CONTROLLER, helixManager.get(), _log); - return; - } - try { onDeleteSpec(updatedSpec.getUri(), updatedSpec.getVersion()); } catch (Exception e) {
