Repository: incubator-gobblin Updated Branches: refs/heads/master 6e4a2cecb -> 99fcd7f3c
[GOBBLIN-262] Make multihop compiler use the first template specified by user Closes #2114 from yukuai518/gaas Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/99fcd7f3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/99fcd7f3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/99fcd7f3 Branch: refs/heads/master Commit: 99fcd7f3c4ad7ca698f3aefe1b51a7a6d1b4b8ec Parents: 6e4a2ce Author: Kuai Yu <[email protected]> Authored: Fri Sep 22 12:58:10 2017 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Fri Sep 22 12:58:10 2017 -0700 ---------------------------------------------------------------------- .../flow/MultiHopsFlowToJobSpecCompiler.java | 45 ++++++++++++++++---- 1 file changed, 36 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99fcd7f3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java index 456f3a3..96d41e5 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java @@ -30,11 +30,14 @@ import java.util.stream.Collectors; import com.google.common.base.Optional; import com.google.common.collect.Maps; import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; import org.apache.commons.lang3.StringUtils; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; import org.apache.gobblin.service.modules.policy.ServicePolicy; import org.apache.gobblin.util.ClassAliasResolver; +import org.apache.gobblin.util.ConfigUtils; import org.jgrapht.graph.DirectedWeightedMultigraph; import org.slf4j.Logger; import org.apache.gobblin.runtime.api.FlowEdge; @@ -188,7 +191,7 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler { FlowEdge tmpFlowEdge = resultEdgePath.get(i); ServiceNode edgeSrcNode = ((LoadBasedFlowEdgeImpl) tmpFlowEdge).getSourceNode(); ServiceNode edgeTgtNode = ((LoadBasedFlowEdgeImpl) tmpFlowEdge).getTargetNode(); - specExecutorInstanceMap.put(jobSpecGenerator(edgeSrcNode, edgeTgtNode, flowSpec), + specExecutorInstanceMap.put(convertHopToJobSpec(edgeSrcNode, edgeTgtNode, flowSpec), ((LoadBasedFlowEdgeImpl) (resultEdgePath.get(i))).getSpecExecutorInstance()); } } @@ -217,7 +220,7 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler { ServiceNode targetNode = new BaseServiceNodeImpl(userSpecfiedPath.get(i + 1)); if (weightedGraph.containsVertex(sourceNode) && weightedGraph.containsVertex(targetNode) && weightedGraph.containsEdge(sourceNode, targetNode)) { - tmpSpecExecutorInstanceMap.put(jobSpecGenerator(sourceNode, targetNode, flowSpec), + tmpSpecExecutorInstanceMap.put(convertHopToJobSpec(sourceNode, targetNode, flowSpec), (((LoadBasedFlowEdgeImpl) weightedGraph.getEdge(sourceNode, targetNode)).getSpecExecutorInstance())); } else { log.error("User Specified Path is invalid"); @@ -261,15 +264,13 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler { /** * Generate JobSpec based on the #templateURI that user specified. */ - private JobSpec jobSpecGenerator(ServiceNode sourceNode, ServiceNode targetNode, FlowEdge flowEdge, URI templateURI, - FlowSpec flowSpec) { + private JobSpec buildJobSpec (ServiceNode sourceNode, ServiceNode targetNode, URI templateURI, FlowSpec flowSpec) { JobSpec jobSpec; JobSpec.Builder jobSpecBuilder = JobSpec.builder(jobSpecURIGenerator(flowSpec, sourceNode, targetNode)) .withConfig(flowSpec.getConfig()) .withDescription(flowSpec.getDescription()) .withVersion(flowSpec.getVersion()); - if (edgeTemplateMap.containsKey(flowEdge.getEdgeIdentity()) && edgeTemplateMap.get(flowEdge.getEdgeIdentity()) - .contains(templateURI)) { + if (templateURI != null) { jobSpecBuilder.withTemplate(templateURI); try { jobSpec = new ResolvedJobSpec(jobSpecBuilder.build(), templateCatalog.get()); @@ -281,6 +282,27 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler { jobSpec = jobSpecBuilder.build(); log.info("Unresolved JobSpec properties are: " + jobSpec.getConfigAsProperties()); } + + // Remove schedule + jobSpec.setConfig(jobSpec.getConfig().withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY)); + + // Add job.name and job.group + if (flowSpec.getConfig().hasPath(ConfigurationKeys.FLOW_NAME_KEY)) { + jobSpec.setConfig(jobSpec.getConfig() + .withValue(ConfigurationKeys.JOB_NAME_KEY, flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_NAME_KEY))); + } + if (flowSpec.getConfig().hasPath(ConfigurationKeys.FLOW_GROUP_KEY)) { + jobSpec.setConfig(jobSpec.getConfig() + .withValue(ConfigurationKeys.JOB_GROUP_KEY, flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY))); + } + + // Add flow execution id for this compilation + long flowExecutionId = System.currentTimeMillis(); + jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, + ConfigValueFactory.fromAnyRef(flowExecutionId))); + + // Reset properties in Spec from Config + jobSpec.setConfigAsProperties(ConfigUtils.configToProperties(jobSpec.getConfig())); return jobSpec; } @@ -289,12 +311,17 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler { * Handle the case when edge is not specified. * Always select the first available template. */ - private JobSpec jobSpecGenerator(ServiceNode sourceNode, ServiceNode targetNode, FlowSpec flowSpec) { + private JobSpec convertHopToJobSpec (ServiceNode sourceNode, ServiceNode targetNode, FlowSpec flowSpec) { FlowEdge flowEdge = weightedGraph.getAllEdges(sourceNode, targetNode).iterator().next(); + URI templateURI = getTemplateURI (sourceNode, targetNode, flowSpec, flowEdge); + return buildJobSpec(sourceNode, targetNode, templateURI, flowSpec); + } + + private URI getTemplateURI (ServiceNode sourceNode, ServiceNode targetNode, FlowSpec flowSpec, FlowEdge flowEdge) { URI firstTemplateURI = (edgeTemplateMap != null && edgeTemplateMap.containsKey(flowEdge.getEdgeIdentity())) ? edgeTemplateMap.get( - flowEdge.getEdgeIdentity()).get(0) : jobSpecGenerator(flowSpec).getUri(); - return this.jobSpecGenerator(sourceNode, targetNode, flowEdge, firstTemplateURI, flowSpec); + flowEdge.getEdgeIdentity()).get(0) : jobSpecGenerator(flowSpec).getTemplateURI().orNull(); + return firstTemplateURI; } /**
