[GOBBLIN-404] Disable immediate execution of all flows in FlowCatalog on Gobblin Service restart[]
Closes #2279 from sv2000/gaasScheduler Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/94bcc169 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/94bcc169 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/94bcc169 Branch: refs/heads/0.12.0 Commit: 94bcc1694ae5575cc1dcfcba12b0efab3ec8ac4e Parents: de83a3f Author: suvasude <[email protected]> Authored: Mon Feb 5 11:40:08 2018 -0800 Committer: Abhishek Tiwari <[email protected]> Committed: Mon Feb 5 11:40:08 2018 -0800 ---------------------------------------------------------------------- .../scheduler/GobblinServiceJobScheduler.java | 67 ++++++++++++++------ .../GobblinServiceJobSchedulerTest.java | 59 +++++++++++++++++ 2 files changed, 106 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bcc169/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index 5c26445..9cb39fb 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -23,8 +23,10 @@ import java.util.Map; import java.util.Properties; import java.util.UUID; + import lombok.Getter; import lombok.extern.slf4j.Slf4j; + import org.apache.commons.lang.StringUtils; import org.apache.helix.HelixManager; import org.apache.helix.InstanceType; @@ -37,9 +39,11 @@ import org.quartz.UnableToInterruptJobException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.collect.Maps; import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; @@ -77,9 +81,10 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata @Getter protected volatile boolean isActive; - public GobblinServiceJobScheduler(Config config, Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, - Optional<TopologyCatalog> topologyCatalog, Orchestrator orchestrator, SchedulerService schedulerService, - Optional<Logger> log) throws Exception { + public GobblinServiceJobScheduler(Config config, Optional<HelixManager> helixManager, + Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Orchestrator orchestrator, + SchedulerService schedulerService, Optional<Logger> log) + throws Exception { super(ConfigUtils.configToProperties(config), schedulerService); _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); @@ -90,13 +95,14 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata this.scheduledFlowSpecs = Maps.newHashMap(); } - public GobblinServiceJobScheduler(Config config, Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, - Optional<TopologyCatalog> topologyCatalog, SchedulerService schedulerService, Optional<Logger> log) throws Exception { + public GobblinServiceJobScheduler(Config config, Optional<HelixManager> helixManager, + Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, SchedulerService schedulerService, + Optional<Logger> log) + throws Exception { this(config, helixManager, flowCatalog, topologyCatalog, new Orchestrator(config, topologyCatalog, log), schedulerService, log); } - public synchronized void setActive(boolean isActive) { if (this.isActive == isActive) { // No-op if already in correct state @@ -111,7 +117,13 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata if (this.flowCatalog.isPresent()) { Collection<Spec> specs = this.flowCatalog.get().getSpecsWithTimeUpdate(); for (Spec spec : specs) { - onAddSpec(spec); + //Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change + if (spec instanceof FlowSpec) { + Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec); + onAddSpec(modifiedSpec); + } else { + onAddSpec(spec); + } } } } @@ -126,8 +138,19 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata } } + @VisibleForTesting + protected static Spec disableFlowRunImmediatelyOnStart(FlowSpec spec) { + Properties properties = spec.getConfigAsProperties(); + properties.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"); + Config config = ConfigFactory.parseProperties(properties); + FlowSpec flowSpec = new FlowSpec(spec.getUri(), spec.getVersion(), spec.getDescription(), config, properties, + spec.getTemplateURIs(), spec.getChildSpecs()); + return flowSpec; + } + @Override - protected void startUp() throws Exception { + protected void startUp() + throws Exception { super.startUp(); } @@ -135,7 +158,8 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata * Synchronize the job scheduling because the same flowSpec can be scheduled by different threads. */ @Override - public synchronized void scheduleJob(Properties jobProps, JobListener jobListener) throws JobException { + public synchronized void scheduleJob(Properties jobProps, JobListener jobListener) + throws JobException { Map<String, Object> additionalJobDataMap = Maps.newHashMap(); additionalJobDataMap.put(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWSPEC, this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY))); @@ -148,7 +172,8 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata } @Override - public void runJob(Properties jobProps, JobListener jobListener) throws JobException { + public void runJob(Properties jobProps, JobListener jobListener) + throws JobException { try { Spec flowSpec = this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)); this.orchestrator.orchestrate(flowSpec); @@ -185,10 +210,11 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata jobConfig.setProperty(ConfigurationKeys.JOB_GROUP_KEY, ((FlowSpec) addedSpec).getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY).toString()); jobConfig.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, - ConfigUtils.getString(((FlowSpec) addedSpec).getConfig(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY,"false")); - if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) - && StringUtils.isNotBlank(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) { - jobConfig.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY)); + ConfigUtils.getString(((FlowSpec) addedSpec).getConfig(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")); + if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) && StringUtils + .isNotBlank(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) { + jobConfig.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, + flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY)); } this.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec); @@ -223,8 +249,9 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata if (!isActive && helixManager.isPresent()) { _log.info("Scheduler running in slave mode, forward Spec delete via Helix message to master: " + deletedSpecURI); - HelixUtils.sendUserDefinedMessage(ServiceConfigKeys.HELIX_FLOWSPEC_REMOVE, deletedSpecURI.toString() + ":" + - deletedSpecVersion, UUID.randomUUID().toString(), InstanceType.CONTROLLER, helixManager.get(), _log); + HelixUtils.sendUserDefinedMessage(ServiceConfigKeys.HELIX_FLOWSPEC_REMOVE, + deletedSpecURI.toString() + ":" + deletedSpecVersion, UUID.randomUUID().toString(), InstanceType.CONTROLLER, + helixManager.get(), _log); return; } @@ -235,12 +262,12 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata this.scheduledFlowSpecs.remove(deletedSpecURI.toString()); unscheduleJob(deletedSpecURI.toString()); } else { - _log.warn(String.format("Spec with URI: %s was not found in cache. May be it was cleaned, if not please " - + "clean it manually", deletedSpecURI)); + _log.warn(String.format( + "Spec with URI: %s was not found in cache. May be it was cleaned, if not please " + "clean it manually", + deletedSpecURI)); } } catch (JobException e) { - _log.warn(String.format("Spec with URI: %s was not unscheduled cleaning", - deletedSpecURI), e); + _log.warn(String.format("Spec with URI: %s was not unscheduled cleaning", deletedSpecURI), e); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bcc169/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java new file mode 100644 index 0000000..a6e1bc5 --- /dev/null +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.scheduler; + +import java.net.URI; +import java.util.Properties; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.FlowSpec; + + +public class GobblinServiceJobSchedulerTest { + private static final String TEST_GROUP_NAME = "testGroup"; + private static final String TEST_FLOW_NAME = "testFlow"; + private static final String TEST_SCHEDULE = "0 1/0 * ? * *"; + private static final String TEST_TEMPLATE_URI = "FS:///templates/test.template"; + + @Test + public void testDisableFlowRunImmediatelyOnStart() + throws Exception { + Properties properties = new Properties(); + properties.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "true"); + properties.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, TEST_SCHEDULE); + properties.setProperty(ConfigurationKeys.JOB_GROUP_KEY, TEST_GROUP_NAME); + properties.setProperty(ConfigurationKeys.JOB_NAME_KEY, TEST_FLOW_NAME); + Config config = ConfigFactory.parseProperties(properties); + FlowSpec spec = FlowSpec.builder().withTemplate(new URI(TEST_TEMPLATE_URI)).withVersion("version") + .withConfigAsProperties(properties).withConfig(config).build(); + FlowSpec modifiedSpec = (FlowSpec) GobblinServiceJobScheduler.disableFlowRunImmediatelyOnStart(spec); + for (URI templateURI : modifiedSpec.getTemplateURIs().get()) { + Assert.assertEquals(templateURI.toString(), TEST_TEMPLATE_URI); + } + Assert.assertEquals(modifiedSpec.getVersion(), "version"); + Config modifiedConfig = modifiedSpec.getConfig(); + Assert.assertFalse(modifiedConfig.getBoolean(ConfigurationKeys.FLOW_RUN_IMMEDIATELY)); + Assert.assertEquals(modifiedConfig.getString(ConfigurationKeys.JOB_GROUP_KEY), TEST_GROUP_NAME); + Assert.assertEquals(modifiedConfig.getString(ConfigurationKeys.JOB_NAME_KEY), TEST_FLOW_NAME); + } +} \ No newline at end of file
