Repository: incubator-gobblin Updated Branches: refs/heads/master c47f43dc3 -> 7af61741c
[GOBBLIN-495] Remove flowSpec when it is a runOnce case Closes #2366 from yukuai518/flowSpec Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/7af61741 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/7af61741 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/7af61741 Branch: refs/heads/master Commit: 7af61741c0206c87052b2e769c245ceb21673d97 Parents: c47f43d Author: Kuai Yu <[email protected]> Authored: Fri May 18 13:48:51 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Fri May 18 13:48:51 2018 -0700 ---------------------------------------------------------------------- .../service/FlowConfigsResourceHandler.java | 8 ++-- ...GobblinServiceFlowConfigResourceHandler.java | 45 +++++++++++++------- .../scheduler/GobblinServiceJobScheduler.java | 27 +++++++----- 3 files changed, 51 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7af61741/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java index b92ab9d..55850f6 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java @@ -25,22 +25,22 @@ import com.linkedin.restli.server.UpdateResponse; public interface FlowConfigsResourceHandler { /** - * Get flow config + * Get {@link FlowConfig} */ FlowConfig getFlowConfig(FlowId flowId) throws FlowConfigLoggedException; /** - * Add flow config can be done locally only iff current node is a master + * Add {@link FlowConfig} */ CreateResponse createFlowConfig(FlowConfig flowConfig) throws FlowConfigLoggedException; /** - * Update flow config can be done locally only iff current node is a master + * Update {@link FlowConfig} */ UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig) throws FlowConfigLoggedException; /** - * Delete flow config can be done locally only iff current node is a master + * Delete {@link FlowConfig} */ UpdateResponse deleteFlowConfig(FlowId flowId, Properties header) throws FlowConfigLoggedException; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7af61741/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java index 55d9cf0..d0e7737 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java @@ -45,9 +45,9 @@ import org.apache.gobblin.service.modules.utils.HelixUtils; /** - * A high available flow config resource handler which consider the leadership change. - * When a non-master status detected, it will forward the rest-li request - * to the master node. Otherwise it will handle it locally. + * An HA (high available) aware {@link FlowConfigsResourceHandler} which consider if current node is Active or Standby. + * When a Standby mode detected, it will forward the rest-li request ({@link FlowConfig}) + * to the Active. Otherwise it will handle it locally. */ @Slf4j public class GobblinServiceFlowConfigResourceHandler implements FlowConfigsResourceHandler { @@ -76,11 +76,16 @@ public class GobblinServiceFlowConfigResourceHandler implements FlowConfigsResou } /** - * Method to handle create Restli request. - * In load balance mode, we will handle flowCatalog I/O locally before forwarding the message to Helix (Active) node. - * Please refer to {@link FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)}. It only handle flowCatalog I/O. + * Adding {@link FlowConfig} should check if current node is active (master). + * If current node is active, call {@link FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)} directly. + * If current node is standby, forward {@link ServiceConfigKeys#HELIX_FLOWSPEC_ADD} to active. The remote active will + * then call {@link FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)}. * - * The listeners of {@link org.apache.gobblin.runtime.spec_catalog.FlowCatalog} won't be triggered in balance mode. + * Please refer to {@link org.apache.gobblin.service.modules.core.ControllerUserDefinedMessageHandlerFactory} for remote handling. + * + * For better I/O load balance, user can enable {@link GobblinServiceFlowConfigResourceHandler#flowCatalogLocalCommit}. + * The {@link FlowConfig} will be then persisted to {@link org.apache.gobblin.runtime.spec_catalog.FlowCatalog} first before it is + * forwarded to active node (if current node is standby) for execution. */ @Override public CreateResponse createFlowConfig(FlowConfig flowConfig) @@ -111,11 +116,16 @@ public class GobblinServiceFlowConfigResourceHandler implements FlowConfigsResou } /** - * Method to handle update Restli request. - * In load balance mode, we will handle flowCatalog I/O locally before forwarding the message to Helix (Active) node. - * Please refer to {@link FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)}. It only handle flowCatalog I/O. + * Updating {@link FlowConfig} should check if current node is active (master). + * If current node is active, call {@link FlowConfigResourceLocalHandler#updateFlowConfig(FlowId, FlowConfig)} directly. + * If current node is standby, forward {@link ServiceConfigKeys#HELIX_FLOWSPEC_UPDATE} to active. The remote active will + * then call {@link FlowConfigResourceLocalHandler#updateFlowConfig(FlowId, FlowConfig)}. + * + * Please refer to {@link org.apache.gobblin.service.modules.core.ControllerUserDefinedMessageHandlerFactory} for remote handling. * - * The listeners of {@link org.apache.gobblin.runtime.spec_catalog.FlowCatalog} won't be triggered in balance mode. + * For better I/O load balance, user can enable {@link GobblinServiceFlowConfigResourceHandler#flowCatalogLocalCommit}. + * The {@link FlowConfig} will be then persisted to {@link org.apache.gobblin.runtime.spec_catalog.FlowCatalog} first before it is + * forwarded to active node (if current node is standby) for execution. */ @Override public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig) @@ -154,11 +164,16 @@ public class GobblinServiceFlowConfigResourceHandler implements FlowConfigsResou } /** - * Method to handle delete Restli request. - * In load balance mode, we will handle flowCatalog I/O locally before forwarding the message to Helix (Active) node. - * Please refer to {@link FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)}. It only handle flowCatalog I/O. + * Deleting {@link FlowConfig} should check if current node is active (master). + * If current node is active, call {@link FlowConfigResourceLocalHandler#deleteFlowConfig(FlowId, Properties)} directly. + * If current node is standby, forward {@link ServiceConfigKeys#HELIX_FLOWSPEC_REMOVE} to active. The remote active will + * then call {@link FlowConfigResourceLocalHandler#deleteFlowConfig(FlowId, Properties)}. + * + * Please refer to {@link org.apache.gobblin.service.modules.core.ControllerUserDefinedMessageHandlerFactory} for remote handling. * - * The listeners of {@link org.apache.gobblin.runtime.spec_catalog.FlowCatalog} won't be triggered in balance mode. + * For better I/O load balance, user can enable {@link GobblinServiceFlowConfigResourceHandler#flowCatalogLocalCommit}. + * The {@link FlowConfig} will be then persisted to {@link org.apache.gobblin.runtime.spec_catalog.FlowCatalog} first before it is + * forwarded to active node (if current node is standby) for execution. */ @Override public UpdateResponse deleteFlowConfig(FlowId flowId, Properties header) http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7af61741/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index 506a53e..1ec791a 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -21,6 +21,8 @@ import java.net.URI; import java.util.Collection; import java.util.Map; import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; import org.apache.commons.lang.StringUtils; import org.apache.helix.HelixManager; @@ -108,8 +110,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata // Since we are going to change status to isActive=true, schedule all flows if (isActive) { - // Need to set active first; otherwise in the STANDBY->ACTIVE transition, - // the onAddSpec will forward specs to the leader, which is itself. + // Need to set active=true first; otherwise in the onAddSpec(), node will forward specs to active node, which is itself. this.isActive = isActive; if (this.flowCatalog.isPresent()) { Collection<Spec> specs = this.flowCatalog.get().getSpecsWithTimeUpdate(); @@ -129,8 +130,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata for (Spec spec : this.scheduledFlowSpecs.values()) { onDeleteSpec(spec.getUri(), spec.getVersion()); } - // Need to set active at the end; otherwise in the ACTIVE->STANDBY transition, - // the onDeleteSpec will forward specs to the leader, which is itself. + // Need to set active=false at the end; otherwise in the onDeleteSpec(), node will forward specs to active node, which is itself. this.isActive = isActive; } } @@ -193,14 +193,15 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata if (addedSpec instanceof FlowSpec) { try { + FlowSpec flowSpec = (FlowSpec) addedSpec; Properties jobConfig = new Properties(); Properties flowSpecProperties = ((FlowSpec) addedSpec).getConfigAsProperties(); jobConfig.putAll(this.properties); jobConfig.setProperty(ConfigurationKeys.JOB_NAME_KEY, addedSpec.getUri().toString()); jobConfig.setProperty(ConfigurationKeys.JOB_GROUP_KEY, - ((FlowSpec) addedSpec).getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY).toString()); + flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY).toString()); jobConfig.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, - ConfigUtils.getString(((FlowSpec) addedSpec).getConfig(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")); + ConfigUtils.getString((flowSpec).getConfig(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")); if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) && StringUtils .isNotBlank(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) { jobConfig.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, @@ -214,11 +215,11 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata scheduleJob(jobConfig, null); if (PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) { _log.info("RunImmediately requested, hence executing FlowSpec: " + addedSpec); - this.jobExecutor.execute(new NonScheduledJobRunner(jobConfig, null)); + this.jobExecutor.execute(new NonScheduledJobRunner(flowSpec.getUri(), false, jobConfig, null)); } } else { _log.info("No FlowSpec schedule found, so running FlowSpec: " + addedSpec); - this.jobExecutor.execute(new NonScheduledJobRunner(jobConfig, null)); + this.jobExecutor.execute(new NonScheduledJobRunner(flowSpec.getUri(), true, jobConfig, null)); } } catch (JobException je) { _log.error("{} Failed to schedule or run FlowSpec {}", serviceName, addedSpec, je); @@ -321,19 +322,25 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata * This class is responsible for running non-scheduled jobs. */ class NonScheduledJobRunner implements Runnable { - + private final URI specUri; private final Properties jobConfig; private final JobListener jobListener; + private final boolean removeSpec; - public NonScheduledJobRunner(Properties jobConfig, JobListener jobListener) { + public NonScheduledJobRunner(URI uri, boolean removeSpec, Properties jobConfig, JobListener jobListener) { + this.specUri = uri; this.jobConfig = jobConfig; this.jobListener = jobListener; + this.removeSpec = removeSpec; } @Override public void run() { try { GobblinServiceJobScheduler.this.runJob(this.jobConfig, this.jobListener); + if (flowCatalog.isPresent() && removeSpec) { + GobblinServiceJobScheduler.this.flowCatalog.get().remove(specUri, new Properties(), false); + } } catch (JobException je) { _log.error("Failed to run job " + this.jobConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY), je); }
