Repository: incubator-gobblin Updated Branches: refs/heads/master b2dd7ddfc -> ccaa02c6e
[GOBBLIN-646] Refactor MultiHopFlowCompiler to use SpecExecutor configs from TopologySpecMap.[] Closes #2516 from sv2000/specExecutorsRefactor Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ccaa02c6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ccaa02c6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ccaa02c6 Branch: refs/heads/master Commit: ccaa02c6e132a5731706a61620f04abd89be75fe Parents: b2dd7dd Author: suvasude <[email protected]> Authored: Fri Dec 7 09:01:05 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Fri Dec 7 09:01:05 2018 -0800 ---------------------------------------------------------------------- .../modules/core/GitFlowGraphMonitor.java | 39 ++++++++++++-- .../modules/flow/MultiHopFlowCompiler.java | 2 +- .../service/modules/flowgraph/BaseFlowEdge.java | 27 ++++------ .../modules/flowgraph/FlowEdgeFactory.java | 7 ++- .../pathfinder/AbstractPathFinder.java | 9 ++-- .../modules/core/GitFlowGraphMonitorTest.java | 19 ++++--- .../modules/flow/MultiHopFlowCompilerTest.java | 54 ++++++++++++++++++-- .../flowgraph/BaseFlowEdgeFactoryTest.java | 21 +++++--- .../adls-1-to-adls-1-retention-1.properties | 6 +-- .../adls-1-to-adls-1-retention-2.properties | 6 +-- .../hdfs-1-to-hdfs-1-encrypt.properties | 6 +-- .../hdfs-1-to-hdfs-1-retention.properties | 6 +-- .../flowedges/hdfs-1-to-hdfs-3.properties | 6 +-- .../hdfs-2-hdfs-2-retention.properties | 6 +-- .../hdfs-2-to-hdfs-2-encrypt.properties | 6 +-- .../flowedges/hdfs-2-to-hdfs-4.properties | 6 +-- .../flowedges/hdfs-3-to-adls-1.properties | 7 +-- .../hdfs-3-to-hdfs-3-retention.properties | 6 +-- .../flowedges/hdfs-4-to-adls-1.properties | 6 +-- .../hdfs-4-to-hdfs-4-retention.properties | 6 +-- .../flowedges/local-to-hdfs-1.properties | 6 +-- .../flowedges/local-to-hdfs-2.properties | 6 +-- .../local-to-local-retention.properties | 6 +-- .../jobs/hdfs-encrypt-avro-to-json.job | 3 ++ .../hdfsToAdl/jobs/distcp-hdfs-to-adl.job | 5 +- .../hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job | 3 ++ .../localToHdfs/jobs/distcp-local-to-hdfs.job | 3 ++ .../distcp-push-hdfs-to-adl.template | 11 +--- .../multihop/jobTemplates/distcp.template | 11 +--- .../topologyspec_catalog/azkaban01.properties | 2 + .../topologyspec_catalog/azkaban02.properties | 2 + .../topologyspec_catalog/azkaban03.properties | 2 + .../topologyspec_catalog/azkaban04.properties | 2 + .../topologyspec_catalog/local01.properties | 2 + .../testExecutor1.properties | 3 ++ .../testExecutor2.properties | 3 ++ 36 files changed, 174 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java index ed16c39..54544a1 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java @@ -18,8 +18,12 @@ package org.apache.gobblin.service.modules.core; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; @@ -36,6 +40,8 @@ import com.typesafe.config.ConfigValueFactory; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.service.modules.flowgraph.DataNode; import org.apache.gobblin.service.modules.flowgraph.FlowEdge; import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory; @@ -82,12 +88,14 @@ public class GitFlowGraphMonitor extends GitMonitoringService { private FSFlowCatalog flowCatalog; private FlowGraph flowGraph; + private final Map<URI, TopologySpec> topologySpecMap; private final Config emptyConfig = ConfigFactory.empty(); - public GitFlowGraphMonitor(Config config, FSFlowCatalog flowCatalog, FlowGraph graph) { + public GitFlowGraphMonitor(Config config, FSFlowCatalog flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap) { super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK)); this.flowCatalog = flowCatalog; this.flowGraph = graph; + this.topologySpecMap = topologySpecMap; } /** @@ -203,11 +211,12 @@ public class GitFlowGraphMonitor extends GitMonitoringService { if (checkFilePath(change.getNewPath(), EDGE_FILE_DEPTH)) { Path edgeFilePath = new Path(this.repositoryDir, change.getNewPath()); try { - Config config = loadEdgeFileWithOverrides(edgeFilePath); - Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(config, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS, + Config edgeConfig = loadEdgeFileWithOverrides(edgeFilePath); + List<SpecExecutor> specExecutors = getSpecExecutors(edgeConfig); + Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(edgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS, FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS)); - FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, config); - FlowEdge edge = flowEdgeFactory.createFlowEdge(config, flowCatalog); + FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, edgeConfig); + FlowEdge edge = flowEdgeFactory.createFlowEdge(edgeConfig, flowCatalog, specExecutors); if (!this.flowGraph.addFlowEdge(edge)) { log.warn("Could not add edge {} to FlowGraph; skipping", edge.getId()); } else { @@ -304,12 +313,32 @@ public class GitFlowGraphMonitor extends GitMonitoringService { String source = edgeFilePath.getParent().getParent().getName(); String destination = edgeFilePath.getParent().getName(); String edgeName = Files.getNameWithoutExtension(edgeFilePath.getName()); + return edgeConfig.withValue(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, ConfigValueFactory.fromAnyRef(source)) .withValue(FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, ConfigValueFactory.fromAnyRef(destination)) .withValue(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, ConfigValueFactory.fromAnyRef(getEdgeId(source, destination, edgeName))); } /** + * This method first retrieves the logical names of all the {@link org.apache.gobblin.runtime.api.SpecExecutor}s + * for this edge and returns the SpecExecutors from the {@link TopologySpec} map. + * @param edgeConfig containing the logical names of SpecExecutors for this edge. + * @return a {@link List<SpecExecutor>}s for this edge. + */ + private List<SpecExecutor> getSpecExecutors(Config edgeConfig) + throws URISyntaxException { + //Get the logical names of SpecExecutors where the FlowEdge can be executed. + List<String> specExecutorNames = ConfigUtils.getStringList(edgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY); + //Load all the SpecExecutor configurations for this FlowEdge from the SpecExecutor Catalog. + List<SpecExecutor> specExecutors = new ArrayList<>(); + for (String specExecutorName: specExecutorNames) { + URI specExecutorUri = new URI(specExecutorName); + specExecutors.add(this.topologySpecMap.get(specExecutorUri).getSpecExecutor()); + } + return specExecutors; + } + + /** * Load the node file. * @param filePath path of the node file relative to the repository root * @return the configuration object http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java index 50da32a..c1b3e84 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java @@ -94,7 +94,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { throw new RuntimeException("Cannot instantiate " + getClass().getName(), e); } this.flowGraph = new BaseFlowGraph(); - this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph); + this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph, this.topologySpecMap); this.serviceManager = new ServiceManager(Lists.newArrayList(this.gitFlowGraphMonitor)); addShutdownHook(); //Start the git flow graph monitor http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java index 56f6c1b..250fd57 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java @@ -17,23 +17,32 @@ package org.apache.gobblin.service.modules.flowgraph; +import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import joptsimple.internal.Strings; import lombok.Getter; import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; import org.apache.gobblin.service.modules.template.FlowTemplate; +import org.apache.gobblin.util.PullFileLoader; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; import org.apache.gobblin.util.ConfigUtils; @@ -128,7 +137,7 @@ public class BaseFlowEdge implements FlowEdge { * @return a {@link BaseFlowEdge} */ @Override - public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog flowCatalog) throws FlowEdgeCreationException { + public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog flowCatalog, List<SpecExecutor> specExecutors) throws FlowEdgeCreationException { try { String source = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, ""); Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty source"); @@ -138,30 +147,16 @@ public class BaseFlowEdge implements FlowEdge { String edgeId = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, ""); Preconditions.checkArgument(!Strings.isNullOrEmpty(edgeId), "A FlowEdge must have a non-null or empty Id"); - List<Config> specExecutorConfigList = new ArrayList<>(); - boolean flag; - for (int i = 0; (flag = edgeProps.hasPath(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i)); i++) { - specExecutorConfigList.add(edgeProps.getConfig(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i)); - } - String flowTemplateDirUri = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY, ""); //Perform basic validation Preconditions.checkArgument(endPoints.size() == 2, "A FlowEdge must have 2 end points"); Preconditions - .checkArgument(specExecutorConfigList.size() > 0, "A FlowEdge must have at least one SpecExecutor"); + .checkArgument(specExecutors.size() > 0, "A FlowEdge must have at least one SpecExecutor"); Preconditions .checkArgument(!Strings.isNullOrEmpty(flowTemplateDirUri), "FlowTemplate URI must be not null or empty"); boolean isActive = ConfigUtils.getBoolean(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY, true); - //Build SpecExecutor from config - List<SpecExecutor> specExecutors = new ArrayList<>(); - - for (Config specExecutorConfig : specExecutorConfigList) { - Class executorClass = Class.forName(specExecutorConfig.getString(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY)); - SpecExecutor executor = (SpecExecutor) GobblinConstructorUtils.invokeLongestConstructor(executorClass, specExecutorConfig); - specExecutors.add(executor); - } FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateDirUri)); return new BaseFlowEdge(endPoints, edgeId, flowTemplate, specExecutors, edgeProps, isActive); } catch (RuntimeException e) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java index 2977231..3744bf0 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java @@ -17,20 +17,23 @@ package org.apache.gobblin.service.modules.flowgraph; +import java.util.List; + import com.typesafe.config.Config; +import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; public interface FlowEdgeFactory { /** * Construct a {@link FlowEdge} from the edge properties * @param edgeProps properties of the {@link FlowEdge} - * @param catalog an instance of {@link FSFlowCatalog} that returns {@link org.apache.gobblin.service.modules.template.FlowTemplate}s + * @param flowCatalog an instance of {@link FSFlowCatalog} that returns {@link org.apache.gobblin.service.modules.template.FlowTemplate}s * useful for creating a {@link FlowEdge}. * @return an instance of {@link FlowEdge} * @throws FlowEdgeCreationException */ - public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog catalog) throws FlowEdgeCreationException; + public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog flowCatalog, List<SpecExecutor> specExecutors) throws FlowEdgeCreationException; public class FlowEdgeCreationException extends Exception { private static final String MESSAGE_FORMAT = "Failed to create FlowEdge because of: %s"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java index f58a6d8..918f4a6 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java @@ -181,7 +181,7 @@ public abstract class AbstractPathFinder implements PathFinder { boolean foundExecutor = false; //Iterate over all executors for this edge. Find the first one that resolves the underlying flow template. for (SpecExecutor specExecutor : flowEdge.getExecutors()) { - Config mergedConfig = getMergedConfig(flowEdge, specExecutor); + Config mergedConfig = getMergedConfig(flowEdge); List<Pair<DatasetDescriptor, DatasetDescriptor>> datasetDescriptorPairs = flowEdge.getFlowTemplate().getResolvingDatasetDescriptors(mergedConfig); for (Pair<DatasetDescriptor, DatasetDescriptor> datasetDescriptorPair : datasetDescriptorPairs) { @@ -285,21 +285,18 @@ public abstract class AbstractPathFinder implements PathFinder { * <ul> * <p> the user provided flow config </p> * <p> edge specific properties/overrides </p> - * <p> spec executor config/overrides </p> * <p> source node config </p> * <p> destination node config </p> * </ul> * Each {@link JobTemplate}'s config will eventually be resolved against this merged config. * @param flowEdge An instance of {@link FlowEdge}. - * @param specExecutor A {@link SpecExecutor}. * @return the merged config derived as described above. */ - private Config getMergedConfig(FlowEdge flowEdge, SpecExecutor specExecutor) + private Config getMergedConfig(FlowEdge flowEdge) throws ExecutionException, InterruptedException { Config srcNodeConfig = this.flowGraph.getNode(flowEdge.getSrc()).getRawConfig().atPath(SOURCE_PREFIX); Config destNodeConfig = this.flowGraph.getNode(flowEdge.getDest()).getRawConfig().atPath(DESTINATION_PREFIX); - Config mergedConfig = flowConfig.withFallback(specExecutor.getConfig().get()).withFallback(flowEdge.getConfig()) - .withFallback(srcNodeConfig).withFallback(destNodeConfig); + Config mergedConfig = flowConfig.withFallback(flowEdge.getConfig()).withFallback(srcNodeConfig).withFallback(destNodeConfig); return mergedConfig; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java index bd7d3cf..ab8af70 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -50,12 +51,14 @@ import com.typesafe.config.ConfigFactory; import org.apache.gobblin.config.ConfigBuilder; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest; import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph; import org.apache.gobblin.service.modules.flowgraph.DataNode; -import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; import org.apache.gobblin.service.modules.flowgraph.FlowEdge; import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; +import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; public class GitFlowGraphMonitorTest { @@ -96,6 +99,9 @@ public class GitFlowGraphMonitorTest { this.gitForPush.commit().setMessage("First commit").call(); this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + URI topologyCatalogUri = this.getClass().getClassLoader().getResource("topologyspec_catalog").toURI(); + Map<URI, TopologySpec> topologySpecMap = MultiHopFlowCompilerTest.buildTopologySpecMap(topologyCatalogUri); + this.config = ConfigBuilder.create() .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_URI, this.remoteRepo.getDirectory().getAbsolutePath()) @@ -116,7 +122,7 @@ public class GitFlowGraphMonitorTest { //Create a FlowGraph instance with defaults this.flowGraph = new BaseFlowGraph(); - this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph); + this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph, topologySpecMap); this.gitFlowGraphMonitor.setActive(true); } @@ -325,14 +331,7 @@ public class GitFlowGraphMonitorTest { + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=" + edgeName + "\n" + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n" + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY + "=FS:///flowEdgeTemplate\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0." - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1." - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "=testExecutor1,testExecutor2\n" + "key1=" + value + "\n"; return fileContents; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java index 54792f9..6b51a77 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java @@ -25,8 +25,10 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.Future; @@ -38,6 +40,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.eclipse.jgit.api.Git; import org.eclipse.jgit.api.errors.GitAPIException; import org.eclipse.jgit.lib.Repository; @@ -68,6 +71,7 @@ import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor; @@ -83,6 +87,7 @@ import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; import org.apache.gobblin.util.CompletedFuture; import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.PathUtils; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; @@ -116,9 +121,11 @@ public class MultiHopFlowCompilerTest { } } + URI specExecutorCatalogUri = this.getClass().getClassLoader().getResource("topologyspec_catalog").toURI(); + Map<URI, TopologySpec> topologySpecMap = buildTopologySpecMap(specExecutorCatalogUri); + //Create a FSFlowCatalog instance URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI(); - // Create a FSFlowCatalog instance Properties properties = new Properties(); properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString()); Config config = ConfigFactory.parseProperties(properties); @@ -127,7 +134,6 @@ public class MultiHopFlowCompilerTest { config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)); FSFlowCatalog flowCatalog = new FSFlowCatalog(templateCatalogCfg); - //Add FlowEdges from the edge properties files URI flowEdgesURI = MultiHopFlowCompilerTest.class.getClassLoader().getResource("flowgraph/flowedges").toURI(); fs = FileSystem.get(flowEdgesURI, new Configuration()); @@ -139,13 +145,55 @@ public class MultiHopFlowCompilerTest { Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(flowEdgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS, FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS)); FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, config); - FlowEdge edge = flowEdgeFactory.createFlowEdge(flowEdgeConfig, flowCatalog); + List<String> specExecutorNames = ConfigUtils.getStringList(flowEdgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY); + List<SpecExecutor> specExecutors = new ArrayList<>(); + for (String specExecutorName: specExecutorNames) { + specExecutors.add(topologySpecMap.get(new URI(specExecutorName)).getSpecExecutor()); + } + FlowEdge edge = flowEdgeFactory.createFlowEdge(flowEdgeConfig, flowCatalog, specExecutors); this.flowGraph.addFlowEdge(edge); } } this.specCompiler = new MultiHopFlowCompiler(config, this.flowGraph); } + /** + * A helper method to return a {@link TopologySpec} map, given a {@link org.apache.gobblin.runtime.spec_catalog.TopologyCatalog}. + * @param topologyCatalogUri pointing to the location of the {@link org.apache.gobblin.runtime.spec_catalog.TopologyCatalog} + * @return a {@link TopologySpec} map. + */ + public static Map<URI, TopologySpec> buildTopologySpecMap(URI topologyCatalogUri) + throws IOException, URISyntaxException, ReflectiveOperationException { + FileSystem fs = FileSystem.get(topologyCatalogUri, new Configuration()); + PathFilter extensionFilter = file -> { + for (String extension : Lists.newArrayList(".properties")) { + if (file.getName().endsWith(extension)) { + return true; + } + } + return false; + }; + + Map<URI, TopologySpec> topologySpecMap = new HashMap<>(); + for (FileStatus fileStatus : fs.listStatus(new Path(topologyCatalogUri.getPath()), extensionFilter)) { + URI topologySpecUri = new URI(Files.getNameWithoutExtension(fileStatus.getPath().getName())); + Config topologyConfig = ConfigFactory.parseFile(new File(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString())); + Class specExecutorClass = Class.forName(topologyConfig.getString(ServiceConfigKeys.SPEC_EXECUTOR_KEY)); + SpecExecutor specExecutor = (SpecExecutor) GobblinConstructorUtils.invokeLongestConstructor(specExecutorClass, topologyConfig); + + TopologySpec.Builder topologySpecBuilder = TopologySpec + .builder(topologySpecUri) + .withConfig(topologyConfig) + .withDescription("") + .withVersion("1") + .withSpecExecutor(specExecutor); + + TopologySpec topologySpec = topologySpecBuilder.build(); + topologySpecMap.put(topologySpecUri, topologySpec); + } + return topologySpecMap; + } + private FlowSpec createFlowSpec(String flowConfigResource, String source, String destination, boolean applyRetention, boolean applyRetentionOnInput) throws IOException, URISyntaxException { //Create a flow spec http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java index 2694f5c..085d0a7 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java @@ -18,14 +18,19 @@ package org.apache.gobblin.service.modules.flowgraph; import java.net.URI; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; import org.apache.gobblin.util.ConfigUtils; import org.testng.Assert; import org.testng.annotations.Test; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.service.ServiceConfigKeys; @@ -43,12 +48,14 @@ public class BaseFlowEdgeFactoryTest { properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY, "edge1"); properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "node1:node2:edge1"); properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY, "FS:///flowEdgeTemplate"); - properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor"); - properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specStore.fs.dir", "/tmp1"); - properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specExecInstance.capabilities", "s1:d1"); - properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor"); - properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specStore.fs.dir", "/tmp2"); - properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specExecInstance.capabilities", "s2:d2"); + + List<SpecExecutor> specExecutorList = new ArrayList<>(); + Config config1 = ConfigFactory.empty().withValue("specStore.fs.dir", ConfigValueFactory.fromAnyRef("/tmp1")). + withValue("specExecInstance.capabilities", ConfigValueFactory.fromAnyRef("s1:d1")); + specExecutorList.add(new InMemorySpecExecutor(config1)); + Config config2 = ConfigFactory.empty().withValue("specStore.fs.dir", ConfigValueFactory.fromAnyRef("/tmp2")). + withValue("specExecInstance.capabilities", ConfigValueFactory.fromAnyRef("s2:d2")); + specExecutorList.add(new InMemorySpecExecutor(config2)); FlowEdgeFactory flowEdgeFactory = new BaseFlowEdge.Factory(); @@ -61,7 +68,7 @@ public class BaseFlowEdgeFactoryTest { config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)); FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg); Config edgeProps = ConfigUtils.propertiesToConfig(properties); - FlowEdge flowEdge = flowEdgeFactory.createFlowEdge(edgeProps, catalog); + FlowEdge flowEdge = flowEdgeFactory.createFlowEdge(edgeProps, catalog, specExecutorList); Assert.assertEquals(flowEdge.getSrc(), "node1"); Assert.assertEquals(flowEdge.getDest(), "node2"); Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),"/tmp1"); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties index 52079d1..b01722c 100644 --- a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties @@ -2,8 +2,4 @@ flow.edge.source=ADLS-1 flow.edge.destination=ADLS-1 flow.edge.id=ADLS-1:ADLS-1:hdfsRemoteRetention flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention -flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor -flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban03.gobblin.net:8443 -flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher -flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE -flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava \ No newline at end of file +flow.edge.specExecutors=azkaban03 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties index 7b1a160..def8cb9 100644 --- a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties @@ -2,8 +2,4 @@ flow.edge.source=ADLS-1 flow.edge.destination=ADLS-1 flow.edge.id=ADLS-1:ADLS-1:hdfsRemoteRetention flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention -flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor -flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban04.gobblin.net:8443 -flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher -flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE -flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava \ No newline at end of file +flow.edge.specExecutors=azkaban04 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties index 44d3c44..110c665 100644 --- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties @@ -2,8 +2,4 @@ flow.edge.source=HDFS-1 flow.edge.destination=HDFS-1 flow.edge.id=HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt -flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor -flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443 -flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher -flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE -flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava +flow.edge.specExecutors=azkaban01 http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties index 926c51e..8e5b5ef 100644 --- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties @@ -2,8 +2,4 @@ flow.edge.source=HDFS-1 flow.edge.destination=HDFS-1 flow.edge.id=HDFS-1:HDFS-1:hdfsRetention flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention -flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor -flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443 -flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher -flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE -flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava \ No newline at end of file +flow.edge.specExecutors=azkaban01 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties index 897f003..5abd015 100644 --- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties @@ -2,9 +2,5 @@ flow.edge.source=HDFS-1 flow.edge.destination=HDFS-3 flow.edge.id=HDFS-1:HDFS-3:hdfsToHdfs flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs -flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor -flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443 -flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher -flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE -flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava +flow.edge.specExecutors=azkaban01 http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties index 26454e7..8b54a22 100644 --- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties @@ -2,8 +2,4 @@ flow.edge.source=HDFS-2 flow.edge.destination=HDFS-2 flow.edge.id=HDFS-2:HDFS-2:hdfsRetention flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention -flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor -flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443 -flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher -flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE -flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava \ No newline at end of file +flow.edge.specExecutors=azkaban02 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties index db0ea48..24339ec 100644 --- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties @@ -2,8 +2,4 @@ flow.edge.source=HDFS-2 flow.edge.destination=HDFS-2 flow.edge.id=HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt -flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor -flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443 -flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher -flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE -flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava +flow.edge.specExecutors=azkaban02 http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties index 44f0408..268a414 100644 --- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties @@ -2,8 +2,4 @@ flow.edge.source=HDFS-2 flow.edge.destination=HDFS-4 flow.edge.id=HDFS-2:HDFS-4:hdfsToHdfs flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs -flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor -flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443 -flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher -flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE -flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava +flow.edge.specExecutors=azkaban02 http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties index 68ca0ca..3471277 100644 --- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties @@ -2,12 +2,7 @@ flow.edge.source=HDFS-3 flow.edge.destination=ADLS-1 flow.edge.id=HDFS-3:ADLS-1:hdfsToAdl flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl -flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor -flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban03.gobblin.net:8443 -flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher -flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE -flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava - +flow.edge.specExecutors=azkaban03 # Proxy config flow.edge.proxy.host=adl-proxy.linkedin.com flow.edge.proxy.port=1234 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties index f390546..f1d1adf 100644 --- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties @@ -2,8 +2,4 @@ flow.edge.source=HDFS-3 flow.edge.destination=HDFS-3 flow.edge.id=HDFS-3:HDFS-3:hdfsRetention flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention -flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor -flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban03.gobblin.net:8443 -flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher -flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE -flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava \ No newline at end of file +flow.edge.specExecutors=azkaban03 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties index 04f0885..9a08893 100644 --- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties @@ -2,11 +2,7 @@ flow.edge.source=HDFS-4 flow.edge.destination=ADLS-1 flow.edge.id=HDFS-4:ADLS-1:hdfsToAdl flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl -flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor -flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban04.gobblin.net:8443 -flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher -flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE -flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava +flow.edge.specExecutors=azkaban04 # Proxy config flow.edge.proxy.host=adl-proxy.linkedin.com http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties index 6afb3d8..0a005c8 100644 --- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties @@ -2,8 +2,4 @@ flow.edge.source=HDFS-4 flow.edge.destination=HDFS-4 flow.edge.id=HDFS-4:HDFS-4:hdfsRetention flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention -flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor -flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban04.gobblin.net:8443 -flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher -flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE -flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava \ No newline at end of file +flow.edge.specExecutors=azkaban04 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties index 268b67f..8a3809e 100644 --- a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties @@ -2,8 +2,4 @@ flow.edge.source=LocalFS-1 flow.edge.destination=HDFS-1 flow.edge.id=LocalFS-1:HDFS-1:localToHdfs flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/localToHdfs -flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor -flow.edge.specExecutors.0.specExecInstance.uri=fs:/// -flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher -flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL -flow.edge.specExecutors.0.specExecInstance.job.type=java +flow.edge.specExecutors=local01 http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties index bc67810..626a12f 100644 --- a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties @@ -2,8 +2,4 @@ flow.edge.source=LocalFS-1 flow.edge.destination=HDFS-2 flow.edge.id=LocalFS-1:HDFS-2:localToHdfs flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/localToHdfs -flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor -flow.edge.specExecutors.0.specExecInstance.uri=fs:/// -flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher -flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL -flow.edge.specExecutors.0.specExecInstance.job.type=java \ No newline at end of file +flow.edge.specExecutors=local01 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties index 76c77fd..c1f17eb 100644 --- a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties @@ -2,8 +2,4 @@ flow.edge.source=LocalFS-1 flow.edge.destination=LocalFS-1 flow.edge.id=LocalFS-1:LocalFS-1:localRetention flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention -flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor -flow.edge.specExecutors.0.specExecInstance.uri=fs:/// -flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher -flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL -flow.edge.specExecutors.0.specExecInstance.job.type=java \ No newline at end of file +flow.edge.specExecutors=local01 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job index e0c8fa4..05fa8cd 100644 --- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job +++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/jobs/hdfs-encrypt-avro-to-json.job @@ -1 +1,4 @@ gobblin.template.uri="FS:///multihop/jobTemplates/hdfs-convert-to-json-and-encrypt.template" +job.class=org.apache.gobblin.azkaban.AzkabanJobLauncher +launcher.type=MAPREDUCE +type=hadoopJava \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job index 429d4c0..779044b 100644 --- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job +++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/jobs/distcp-hdfs-to-adl.job @@ -1 +1,4 @@ -gobblin.template.uri="FS:///multihop/jobTemplates/distcp-push-hdfs-to-adl.template" \ No newline at end of file +gobblin.template.uri="FS:///multihop/jobTemplates/distcp-push-hdfs-to-adl.template" +job.class=org.apache.gobblin.azkaban.AzkabanJobLauncher +launcher.type=MAPREDUCE +type=hadoopJava \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job index 2d1672c..13c067a 100644 --- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job +++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job @@ -1 +1,4 @@ gobblin.template.uri="FS:///multihop/jobTemplates/distcp.template" +job.class=org.apache.gobblin.azkaban.AzkabanJobLauncher +launcher.type=MAPREDUCE +type=hadoopJava \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job index 2d1672c..823b0a4 100644 --- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job +++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/jobs/distcp-local-to-hdfs.job @@ -1 +1,4 @@ gobblin.template.uri="FS:///multihop/jobTemplates/distcp.template" +job.class=org.apache.gobblin.runtime.local.LocalJobLauncher +launcher.type=LOCAL +type=java \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template index 26b2ed1..0b664dd 100644 --- a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template +++ b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-adl.template @@ -52,14 +52,5 @@ source.class="org.apache.gobblin.data.management.copy.CopySource" writer.builder.class="org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder" converter.classes="org.apache.gobblin.converter.IdentityConverter" -# ======================================= -# Job Parameters to be resolved using SpecExecutor properties -# ======================================= -type=${specExecInstance.job.type} - job.jars="lib/*" -job.lock.enabled=false -job.class=${specExecInstance.job.launcher.class} - -# Gobblin Hadoop Parameters -launcher.type=${specExecInstance.job.launcher.type} \ No newline at end of file +job.lock.enabled=false \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template index d9fcf6d..1a29262 100644 --- a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template +++ b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template @@ -43,14 +43,5 @@ source.class="org.apache.gobblin.data.management.copy.CopySource" writer.builder.class="org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder" converter.classes="org.apache.gobblin.converter.IdentityConverter" -# ======================================= -# Job Parameters to be resolved using SpecExecutor properties -# ======================================= -type=${specExecInstance.job.type} - job.jars="lib/*" -job.lock.enabled=false -job.class=${specExecInstance.job.launcher.class} - -# Gobblin Hadoop Parameters -launcher.type=${specExecInstance.job.launcher.type} \ No newline at end of file +job.lock.enabled=false \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/azkaban01.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/azkaban01.properties b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban01.properties new file mode 100644 index 0000000..4bbb1d0 --- /dev/null +++ b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban01.properties @@ -0,0 +1,2 @@ +specExecutorInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor +specExecInstance.uri=https://azkaban01.gobblin.net:8443 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/azkaban02.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/azkaban02.properties b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban02.properties new file mode 100644 index 0000000..1a8856b --- /dev/null +++ b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban02.properties @@ -0,0 +1,2 @@ +specExecutorInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor +specExecInstance.uri=https://azkaban02.gobblin.net:8443 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/azkaban03.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/azkaban03.properties b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban03.properties new file mode 100644 index 0000000..6261c8b --- /dev/null +++ b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban03.properties @@ -0,0 +1,2 @@ +specExecutorInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor +specExecInstance.uri=https://azkaban03.gobblin.net:8443 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/azkaban04.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/azkaban04.properties b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban04.properties new file mode 100644 index 0000000..12125fd --- /dev/null +++ b/gobblin-service/src/test/resources/topologyspec_catalog/azkaban04.properties @@ -0,0 +1,2 @@ +specExecutorInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor +specExecInstance.uri=https://azkaban04.gobblin.net:8443 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/local01.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/local01.properties b/gobblin-service/src/test/resources/topologyspec_catalog/local01.properties new file mode 100644 index 0000000..db369de --- /dev/null +++ b/gobblin-service/src/test/resources/topologyspec_catalog/local01.properties @@ -0,0 +1,2 @@ +specExecutorInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor +specExecInstance.uri=fs:/// \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor1.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor1.properties b/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor1.properties new file mode 100644 index 0000000..7b67f9b --- /dev/null +++ b/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor1.properties @@ -0,0 +1,3 @@ +specExecutorInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor +specStore.fs.dir=/tmp1 +specExecInstance.capabilities=s1:d1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccaa02c6/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor2.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor2.properties b/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor2.properties new file mode 100644 index 0000000..449a076 --- /dev/null +++ b/gobblin-service/src/test/resources/topologyspec_catalog/testExecutor2.properties @@ -0,0 +1,3 @@ +specExecutorInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor +specStore.fs.dir=/tmp2 +specExecInstance.capabilities=s2:d2 \ No newline at end of file
