http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java new file mode 100644 index 0000000..5d0500c --- /dev/null +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java @@ -0,0 +1,417 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flow; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.Charset; +import java.util.Properties; +import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.base.Charsets; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import com.typesafe.config.ConfigSyntax; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.JobTemplate; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecNotFoundException; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DataNode; +import org.apache.gobblin.service.modules.flowgraph.FlowEdge; +import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory; +import org.apache.gobblin.service.modules.flowgraph.FlowGraph; +import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; +import org.apache.gobblin.util.CompletedFuture; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + + +@Slf4j +public class FlowGraphPathFinderTest { + private FlowGraph flowGraph; + private FlowGraphPathFinder pathFinder; + + @BeforeClass + public void setUp() + throws URISyntaxException, IOException, ReflectiveOperationException, FlowEdgeFactory.FlowEdgeCreationException { + //Create a FlowGraph + this.flowGraph = new BaseFlowGraph(); + + //Add DataNodes to the graph from the node properties files + URI dataNodesUri = FlowGraphPathFinderTest.class.getClassLoader().getResource("flowgraph/datanodes").toURI(); + FileSystem fs = FileSystem.get(dataNodesUri, new Configuration()); + Path dataNodesPath = new Path(dataNodesUri); + ConfigParseOptions options = ConfigParseOptions.defaults() + .setSyntax(ConfigSyntax.PROPERTIES) + .setAllowMissing(false); + + for (FileStatus fileStatus: fs.listStatus(dataNodesPath)) { + try (InputStream is = fs.open(fileStatus.getPath())) { + Config nodeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options); + Class dataNodeClass = Class.forName(ConfigUtils + .getString(nodeConfig, FlowGraphConfigurationKeys.DATA_NODE_CLASS, FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS)); + DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, nodeConfig); + this.flowGraph.addDataNode(dataNode); + } + } + + //Create a FSFlowCatalog instance + URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI(); + // Create a FSFlowCatalog instance + Properties properties = new Properties(); + properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString()); + Config config = ConfigFactory.parseProperties(properties); + Config templateCatalogCfg = config + .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, + config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)); + FSFlowCatalog flowCatalog = new FSFlowCatalog(templateCatalogCfg); + + + //Add FlowEdges from the edge properties files + URI flowEdgesURI = FlowGraphPathFinderTest.class.getClassLoader().getResource("flowgraph/flowedges").toURI(); + fs = FileSystem.get(flowEdgesURI, new Configuration()); + Path flowEdgesPath = new Path(flowEdgesURI); + for (FileStatus fileStatus: fs.listStatus(flowEdgesPath)) { + log.warn(fileStatus.getPath().toString()); + try (InputStream is = fs.open(fileStatus.getPath())) { + Config flowEdgeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options); + Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(flowEdgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS, + FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS)); + FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, config); + FlowEdge edge = flowEdgeFactory.createFlowEdge(flowEdgeConfig, flowCatalog); + this.flowGraph.addFlowEdge(edge); + } + } + + //Create a flow spec + Properties flowProperties = new Properties(); + flowProperties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "* * * * *"); + flowProperties.put(ConfigurationKeys.FLOW_GROUP_KEY, "testFlowGroup"); + flowProperties.put(ConfigurationKeys.FLOW_NAME_KEY, "testFlowName"); + flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "LocalFS-1"); + flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "ADLS-1"); + Config flowConfig = ConfigUtils.propertiesToConfig(flowProperties); + + //Get the input/output dataset config from a file + URI flowConfigUri = FlowGraphPathFinderTest.class.getClassLoader().getResource("flow/flow.conf").toURI(); + Path flowConfigPath = new Path(flowConfigUri); + FileSystem fs1 = FileSystem.get(flowConfigUri, new Configuration()); + try (InputStream is = fs1.open(flowConfigPath)) { + Config datasetConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charset.defaultCharset())); + flowConfig = flowConfig.withFallback(datasetConfig).resolve(); + } + + FlowSpec.Builder flowSpecBuilder = null; + flowSpecBuilder = FlowSpec.builder(new Path("/tmp/flowSpecCatalog").toUri()) + .withConfig(flowConfig) + .withDescription("dummy description") + .withVersion(FlowSpec.Builder.DEFAULT_VERSION); + + FlowSpec spec = flowSpecBuilder.build(); + this.pathFinder = new FlowGraphPathFinder(this.flowGraph, spec); + } + + @Test + public void testFindPath() + throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException, + SpecNotFoundException, IOException { + Dag<JobExecutionPlan> jobDag = pathFinder.findPath().asDag(); + Assert.assertEquals(jobDag.getNodes().size(), 4); + Assert.assertEquals(jobDag.getStartNodes().size(), 1); + Assert.assertEquals(jobDag.getEndNodes().size(), 1); + + //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-1" + Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0); + JobExecutionPlan jobSpecWithExecutor = startNode.getValue(); + JobSpec jobSpec = jobSpecWithExecutor.getJobSpec(); + + //Ensure the resolved job config for the first hop has the correct substitutions. + Config jobConfig = jobSpec.getConfig(); + Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS"); + String from = jobConfig.getString("from"); + String to = jobConfig.getString("to"); + Assert.assertEquals(from, "/data/out/testTeam/testDataset"); + Assert.assertEquals(to, "/data/out/testTeam/testDataset"); + String sourceFsUri = jobConfig.getString("fs.uri"); + Assert.assertEquals(sourceFsUri, "file:///"); + Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri); + Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri); + String targetFsUri = jobConfig.getString("target.filebased.fs.uri"); + Assert.assertEquals(targetFsUri, "hdfs://hadoopnn01.grid.linkedin.com:8888/"); + Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri); + Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from); + Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to); + Assert.assertEquals(jobConfig.getString("type"), "java"); + Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher"); + Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL"); + //Ensure the spec executor has the correct configurations + SpecExecutor specExecutor = jobSpecWithExecutor.getSpecExecutor(); + Assert.assertEquals(specExecutor.getUri().toString(), "fs:///"); + Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor"); + + //Get the 2nd hop - "HDFS-1 to HDFS-1 : convert avro to json and encrypt" + Assert.assertEquals(jobDag.getChildren(startNode).size(), 1); + Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0); + jobSpecWithExecutor = secondHopNode.getValue(); + jobConfig = jobSpecWithExecutor.getJobSpec().getConfig(); + Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt"); + from = jobConfig.getString("from"); + to = jobConfig.getString("to"); + Assert.assertEquals(from, "/data/out/testTeam/testDataset"); + Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset"); + Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from); + Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to); + specExecutor = jobSpecWithExecutor.getSpecExecutor(); + Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443"); + Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor"); + + //Get the 3rd hop - "Distcp HDFS-1 to HDFS-3" + Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1); + Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0); + jobSpecWithExecutor = thirdHopNode.getValue(); + jobConfig = jobSpecWithExecutor.getJobSpec().getConfig(); + Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS"); + from = jobConfig.getString("from"); + to = jobConfig.getString("to"); + Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset"); + Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset"); + Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn01.grid.linkedin.com:8888/"); + Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/"); + Assert.assertEquals(jobConfig.getString("type"), "hadoopJava"); + Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher"); + Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE"); + //Ensure the spec executor has the correct configurations + specExecutor = jobSpecWithExecutor.getSpecExecutor(); + Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443"); + Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor"); + + //Get the 4th hop - "Distcp from HDFS3 to ADLS-1" + Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1); + Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0); + jobSpecWithExecutor = fourthHopNode.getValue(); + jobConfig = jobSpecWithExecutor.getJobSpec().getConfig(); + Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL"); + from = jobConfig.getString("from"); + to = jobConfig.getString("to"); + Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset"); + Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset"); + Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/"); + Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/"); + Assert.assertEquals(jobConfig.getString("type"), "hadoopJava"); + Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher"); + Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE"); + Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234"); + Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential"); + Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password"); + //Ensure the spec executor has the correct configurations + specExecutor = jobSpecWithExecutor.getSpecExecutor(); + Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban03.gobblin.net:8443"); + Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor"); + + //Ensure the fourth hop is the last + Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode); + } + + @Test (dependsOnMethods = "testFindPath") + public void testFindPathAfterFirstEdgeDeletion() + throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException, + SpecNotFoundException, IOException { + //Delete the self edge on HDFS-1 that performs convert-to-json-and-encrypt. + this.flowGraph.deleteFlowEdge("HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt"); + + Dag<JobExecutionPlan> jobDag = pathFinder.findPath().asDag(); + Assert.assertEquals(jobDag.getNodes().size(), 4); + Assert.assertEquals(jobDag.getStartNodes().size(), 1); + Assert.assertEquals(jobDag.getEndNodes().size(), 1); + + //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-2" + Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0); + JobExecutionPlan jobExecutionPlan = startNode.getValue(); + JobSpec jobSpec = jobExecutionPlan.getJobSpec(); + + //Ensure the resolved job config for the first hop has the correct substitutions. + Config jobConfig = jobSpec.getConfig(); + Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS"); + String from = jobConfig.getString("from"); + String to = jobConfig.getString("to"); + Assert.assertEquals(from, "/data/out/testTeam/testDataset"); + Assert.assertEquals(to, "/data/out/testTeam/testDataset"); + String sourceFsUri = jobConfig.getString("fs.uri"); + Assert.assertEquals(sourceFsUri, "file:///"); + Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri); + Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri); + String targetFsUri = jobConfig.getString("target.filebased.fs.uri"); + Assert.assertEquals(targetFsUri, "hdfs://hadoopnn02.grid.linkedin.com:8888/"); + Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri); + Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from); + Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to); + Assert.assertEquals(jobConfig.getString("type"), "java"); + Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher"); + Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL"); + //Ensure the spec executor has the correct configurations + SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor(); + Assert.assertEquals(specExecutor.getUri().toString(), "fs:///"); + Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor"); + + //Get the 2nd hop - "HDFS-2 to HDFS-2 : convert avro to json and encrypt" + Assert.assertEquals(jobDag.getChildren(startNode).size(), 1); + Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0); + jobExecutionPlan = secondHopNode.getValue(); + jobConfig = jobExecutionPlan.getJobSpec().getConfig(); + Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt"); + from = jobConfig.getString("from"); + to = jobConfig.getString("to"); + Assert.assertEquals(from, "/data/out/testTeam/testDataset"); + Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset"); + Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from); + Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to); + specExecutor = jobExecutionPlan.getSpecExecutor(); + Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443"); + Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor"); + + //Get the 3rd hop - "Distcp HDFS-2 to HDFS-4" + Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1); + Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0); + jobExecutionPlan = thirdHopNode.getValue(); + jobConfig = jobExecutionPlan.getJobSpec().getConfig(); + Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS"); + from = jobConfig.getString("from"); + to = jobConfig.getString("to"); + Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset"); + Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset"); + Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn02.grid.linkedin.com:8888/"); + Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/"); + Assert.assertEquals(jobConfig.getString("type"), "hadoopJava"); + Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher"); + Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE"); + //Ensure the spec executor has the correct configurations + specExecutor = jobExecutionPlan.getSpecExecutor(); + Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443"); + Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor"); + + //Get the 4th hop - "Distcp from HDFS4 to ADLS-1" + Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1); + Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0); + jobExecutionPlan = fourthHopNode.getValue(); + jobConfig = jobExecutionPlan.getJobSpec().getConfig(); + Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL"); + from = jobConfig.getString("from"); + to = jobConfig.getString("to"); + Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset"); + Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset"); + Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/"); + Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/"); + Assert.assertEquals(jobConfig.getString("type"), "hadoopJava"); + Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher"); + Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE"); + Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234"); + Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential"); + Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password"); + //Ensure the spec executor has the correct configurations + specExecutor = jobExecutionPlan.getSpecExecutor(); + Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban04.gobblin.net:8443"); + Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor"); + + //Ensure the fourth hop is the last + Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode); + } + + @Test (dependsOnMethods = "testFindPathAfterFirstEdgeDeletion") + public void testFindPathAfterSecondEdgeDeletion() + throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException, + SpecNotFoundException, IOException { + //Delete the self edge on HDFS-2 that performs convert-to-json-and-encrypt. + this.flowGraph.deleteFlowEdge("HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt"); + + //Ensure no path to destination. + Assert.assertNull(pathFinder.findPath()); + } + + @AfterClass + public void tearDown() { + } + + public static class TestAzkabanSpecExecutor extends AbstractSpecExecutor { + // Executor Instance + protected final Config config; + + private SpecProducer<Spec> azkabanSpecProducer; + + public TestAzkabanSpecExecutor(Config config) { + super(config); + this.config = config; + } + + @Override + protected void startUp() throws Exception { + //Do nothing + } + + @Override + protected void shutDown() throws Exception { + //Do nothing + } + + @Override + public Future<String> getDescription() { + return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null); + } + + @Override + public Future<? extends SpecProducer> getProducer() { + return new CompletedFuture<>(this.azkabanSpecProducer, null); + } + + @Override + public Future<Config> getConfig() { + return new CompletedFuture<>(config, null); + } + + @Override + public Future<String> getHealth() { + return new CompletedFuture<>("Healthy", null); + } + + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java new file mode 100644 index 0000000..2694f5c --- /dev/null +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flowgraph; + +import java.net.URI; +import java.util.Properties; + +import org.apache.gobblin.util.ConfigUtils; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class BaseFlowEdgeFactoryTest { + @Test + public void testCreateFlowEdge() throws Exception { + Properties properties = new Properties(); + properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,"node1"); + properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, "node2"); + properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY, "edge1"); + properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "node1:node2:edge1"); + properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY, "FS:///flowEdgeTemplate"); + properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor"); + properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specStore.fs.dir", "/tmp1"); + properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specExecInstance.capabilities", "s1:d1"); + properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor"); + properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specStore.fs.dir", "/tmp2"); + properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specExecInstance.capabilities", "s2:d2"); + + FlowEdgeFactory flowEdgeFactory = new BaseFlowEdge.Factory(); + + Properties props = new Properties(); + URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI(); + props.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString()); + Config config = ConfigFactory.parseProperties(props); + Config templateCatalogCfg = config + .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, + config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)); + FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg); + Config edgeProps = ConfigUtils.propertiesToConfig(properties); + FlowEdge flowEdge = flowEdgeFactory.createFlowEdge(edgeProps, catalog); + Assert.assertEquals(flowEdge.getSrc(), "node1"); + Assert.assertEquals(flowEdge.getDest(), "node2"); + Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),"/tmp1"); + Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"),"s1:d1"); + Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"),"/tmp2"); + Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"),"s2:d2"); + Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(),"InMemorySpecExecutor"); + Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(),"InMemorySpecExecutor"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java index 04f2270..be7b597 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java @@ -17,7 +17,6 @@ package org.apache.gobblin.service.modules.flowgraph; -import java.io.IOException; import java.lang.reflect.Field; import java.net.URI; import java.net.URISyntaxException; @@ -33,8 +32,6 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; -import org.apache.gobblin.runtime.api.JobTemplate; -import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.service.modules.template.FlowTemplate; import org.apache.gobblin.service.modules.template.StaticFlowTemplate; import org.apache.gobblin.util.ConfigUtils; @@ -57,9 +54,7 @@ public class BaseFlowGraphTest { BaseFlowGraph graph; @BeforeClass - public void setUp() - throws URISyntaxException, ReflectiveOperationException, JobTemplate.TemplateException, SpecNotFoundException, - IOException, DataNode.DataNodeCreationException { + public void setUp() throws URISyntaxException, DataNode.DataNodeCreationException { Properties properties = new Properties(); properties.put("key1", "val1"); Config node1Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, @@ -81,9 +76,9 @@ public class BaseFlowGraphTest { //Create a clone of node3 node3c = new BaseDataNode(node3Config); - FlowTemplate flowTemplate1 = new StaticFlowTemplate(new URI("FS:///uri1"), "", "", ConfigFactory.empty(), null, null, null); - FlowTemplate flowTemplate2 = new StaticFlowTemplate(new URI("FS:///uri2"), "", "", ConfigFactory.empty(), null, null, null); - FlowTemplate flowTemplate3 = new StaticFlowTemplate(new URI("FS:///uri3"), "", "", ConfigFactory.empty(), null, null, null); + FlowTemplate flowTemplate1 = new StaticFlowTemplate(new URI("FS:///uri1"), "", "", ConfigFactory.empty(), null, null); + FlowTemplate flowTemplate2 = new StaticFlowTemplate(new URI("FS:///uri2"), "", "", ConfigFactory.empty(), null, null); + FlowTemplate flowTemplate3 = new StaticFlowTemplate(new URI("FS:///uri3"), "", "", ConfigFactory.empty(), null, null); //Create edge instances edgeId1 = "node1:node2:edge1"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java new file mode 100644 index 0000000..2542f5e --- /dev/null +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.spec; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import org.apache.hadoop.fs.Path; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.io.Files; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.JobTemplate; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecNotFoundException; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.template.FlowTemplate; +import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; +import org.apache.gobblin.util.ConfigUtils; + + +public class JobExecutionPlanDagFactoryTest { + private static final String TEST_TEMPLATE_NAME = "flowEdgeTemplate"; + private static final String TEST_TEMPLATE_URI = "FS:///" + TEST_TEMPLATE_NAME; + private SpecExecutor specExecutor; + private List<JobTemplate> jobTemplates; + + @BeforeClass + public void setUp() throws URISyntaxException, IOException, SpecNotFoundException, JobTemplate.TemplateException { + // Create a FSFlowCatalog instance + URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI(); + Properties properties = new Properties(); + properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString()); + Config config = ConfigFactory.parseProperties(properties); + Config templateCatalogCfg = config + .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, + config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)); + FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg); + FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_URI)); + this.jobTemplates = flowTemplate.getJobTemplates(); + + //Create a spec executor instance + properties = new Properties(); + properties.put("specStore.fs.dir", "/tmp/testSpecStoreDir"); + properties.put("specExecInstance.capabilities", "source:destination"); + Config specExecutorConfig = ConfigUtils.propertiesToConfig(properties); + this.specExecutor = new InMemorySpecExecutor(specExecutorConfig); + } + + @Test + public void testCreateDag() throws Exception { + //Create a list of JobExecutionPlans + List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>(); + for (JobTemplate jobTemplate: this.jobTemplates) { + String jobSpecUri = Files.getNameWithoutExtension(new Path(jobTemplate.getUri()).getName()); + jobExecutionPlans.add(new JobExecutionPlan(JobSpec.builder(jobSpecUri).withConfig(jobTemplate.getRawTemplateConfig()). + withVersion("1").withTemplate(jobTemplate.getUri()).build(), specExecutor)); + } + + //Create a DAG from job execution plans. + Dag<JobExecutionPlan> dag = new JobExecutionPlanDagFactory().createDag(jobExecutionPlans); + + //Test DAG properties + Assert.assertEquals(dag.getStartNodes().size(), 1); + Assert.assertEquals(dag.getEndNodes().size(), 1); + Assert.assertEquals(dag.getNodes().size(), 4); + String startNodeName = new Path(dag.getStartNodes().get(0).getValue().getJobSpec().getUri()).getName(); + Assert.assertEquals(startNodeName, "job1"); + String templateUri = new Path(dag.getStartNodes().get(0).getValue().getJobSpec().getTemplateURI().get()).getName(); + Assert.assertEquals(templateUri, "job1.job"); + String endNodeName = new Path(dag.getEndNodes().get(0).getValue().getJobSpec().getUri()).getName(); + Assert.assertEquals(endNodeName, "job4"); + templateUri = new Path(dag.getEndNodes().get(0).getValue().getJobSpec().getTemplateURI().get()).getName(); + Assert.assertEquals(templateUri, "job4.job"); + + Dag.DagNode<JobExecutionPlan> startNode = dag.getStartNodes().get(0); + List<Dag.DagNode<JobExecutionPlan>> nextNodes = dag.getChildren(startNode); + Set<String> nodeSet = new HashSet<>(); + for (Dag.DagNode<JobExecutionPlan> node: nextNodes) { + nodeSet.add(new Path(node.getValue().getJobSpec().getUri()).getName()); + Dag.DagNode<JobExecutionPlan> nextNode = dag.getChildren(node).get(0); + Assert.assertEquals(new Path(nextNode.getValue().getJobSpec().getUri()).getName(), "job4"); + } + Assert.assertTrue(nodeSet.contains("job2")); + Assert.assertTrue(nodeSet.contains("job3")); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactoryTest.java deleted file mode 100644 index 58d879e..0000000 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactoryTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service.modules.template; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashSet; -import java.util.List; -import java.util.Properties; -import java.util.Set; - -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.runtime.api.JobTemplate; -import org.apache.gobblin.runtime.api.SpecNotFoundException; -import org.apache.gobblin.service.ServiceConfigKeys; -import org.apache.gobblin.service.modules.flowgraph.Dag; -import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; -import org.apache.hadoop.fs.Path; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - - -public class JobTemplateDagFactoryTest { - private static final String TEST_TEMPLATE_NAME = "test-template"; - private static final String TEST_FLOW_CONF_FILE_NAME="flow.conf"; - private static final String TEST_TEMPLATE_URI = "FS:///" + TEST_TEMPLATE_NAME + "/" + TEST_FLOW_CONF_FILE_NAME; - FSFlowCatalog catalog; - - @BeforeClass - public void setUp() - throws URISyntaxException, IOException, SpecNotFoundException, JobTemplate.TemplateException { - URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI(); - - // Create a FSFlowCatalog instance - Properties properties = new Properties(); - properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString()); - Config config = ConfigFactory.parseProperties(properties); - Config templateCatalogCfg = config - .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, - config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)); - this.catalog = new FSFlowCatalog(templateCatalogCfg); - } - - @Test - public void testCreateDagFromJobTemplates() throws Exception { - FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_URI)); - List<JobTemplate> jobTemplates = flowTemplate.getJobTemplates(); - - //Create a DAG from job templates. - Dag<JobTemplate> jobTemplateDag = JobTemplateDagFactory.createDagFromJobTemplates(jobTemplates); - - //Test DAG properties - Assert.assertEquals(jobTemplateDag.getStartNodes().size(), 1); - Assert.assertEquals(jobTemplateDag.getEndNodes().size(), 1); - Assert.assertEquals(jobTemplateDag.getNodes().size(), 4); - String startNodeName = new Path(jobTemplateDag.getStartNodes().get(0).getValue().getUri()).getName(); - Assert.assertEquals(startNodeName, "job1.conf"); - String endNodeName = new Path(jobTemplateDag.getEndNodes().get(0).getValue().getUri()).getName(); - Assert.assertEquals(endNodeName, "job4.conf"); - - Dag.DagNode<JobTemplate> startNode = jobTemplateDag.getStartNodes().get(0); - List<Dag.DagNode<JobTemplate>> nextNodes = jobTemplateDag.getChildren(startNode); - Set<String> nodeSet = new HashSet<>(); - for(Dag.DagNode<JobTemplate> node: nextNodes) { - nodeSet.add(new Path(node.getValue().getUri()).getName()); - Dag.DagNode<JobTemplate> nextNode = jobTemplateDag.getChildren(node).get(0); - Assert.assertEquals(new Path(nextNode.getValue().getUri()).getName(), "job4.conf"); - } - Assert.assertTrue(nodeSet.contains("job2.conf")); - Assert.assertTrue(nodeSet.contains("job3.conf")); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java index b20606f..3c8ebd3 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalogTest.java @@ -23,19 +23,20 @@ import java.util.Properties; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; import org.apache.gobblin.runtime.api.JobTemplate; -import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.hadoop.fs.Path; import org.testng.Assert; import org.testng.annotations.Test; import org.apache.commons.lang3.tuple.Pair; import org.apache.gobblin.service.modules.dataset.DatasetDescriptor; -import org.apache.gobblin.service.modules.dataset.HdfsDatasetDescriptor; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor; +import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys; import org.apache.gobblin.service.modules.template.FlowTemplate; import org.testng.collections.Lists; @@ -43,9 +44,8 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class FSFlowCatalogTest { - private static final String TEST_TEMPLATE_NAME = "test-template"; - private static final String TEST_FLOW_CONF_FILE_NAME="flow.conf"; - private static final String TEST_TEMPLATE_URI = "FS:///" + TEST_TEMPLATE_NAME + "/" + TEST_FLOW_CONF_FILE_NAME; + private static final String TEST_TEMPLATE_NAME = "flowEdgeTemplate"; + private static final String TEST_TEMPLATE_DIR_URI = "FS:///" + TEST_TEMPLATE_NAME; @Test public void testGetFlowTemplate() throws Exception { @@ -58,50 +58,45 @@ public class FSFlowCatalogTest { .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)); FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg); - FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_URI)); + FlowTemplate flowTemplate = catalog.getFlowTemplate(new URI(TEST_TEMPLATE_DIR_URI)); //Basic sanity check for the FlowTemplate - Dag<JobTemplate> jobTemplateDag = flowTemplate.getDag(); - List<Dag.DagNode<JobTemplate>> dagNodes = jobTemplateDag.getNodes(); - Assert.assertTrue(dagNodes.size() == 4); - Assert.assertEquals(jobTemplateDag.getStartNodes().size(), 1); - Assert.assertEquals(jobTemplateDag.getEndNodes().size(), 1); - Dag.DagNode<JobTemplate> dagNode = jobTemplateDag.getStartNodes().get(0); - URI startNodeUri = this.getClass().getClassLoader().getResource("template_catalog/test-template/jobs/job1.conf").toURI(); - URI endNodeUri = this.getClass().getClassLoader().getResource("template_catalog/test-template/jobs/job4.conf").toURI(); - Assert.assertEquals(jobTemplateDag.getStartNodes().get(0).getValue().getUri(), startNodeUri); - Assert.assertEquals(jobTemplateDag.getEndNodes().get(0).getValue().getUri(), endNodeUri); List<JobTemplate> jobTemplates = flowTemplate.getJobTemplates(); Assert.assertEquals(jobTemplates.size(), 4); - for(int i=0; i<4; i++) { + for (int i = 0; i < 4; i++) { String uri = new Path(jobTemplates.get(i).getUri()).getName().split("\\.")[0]; String templateId = uri.substring(uri.length() - 1); - for(int j=0; j<2; j++) { + for (int j = 0; j < 2; j++) { Config jobTemplateConfig = jobTemplates.get(i).getRawTemplateConfig(); - String suffix = templateId + Integer.toString(j+1); + String suffix = templateId + Integer.toString(j + 1); Assert.assertEquals(jobTemplateConfig.getString("key" + suffix), "val" + suffix); } } - List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDescriptors = flowTemplate.getInputOutputDatasetDescriptors(); + Config flowConfig = ConfigFactory.empty().withValue("team.name", ConfigValueFactory.fromAnyRef("test-team")) + .withValue("dataset.name", ConfigValueFactory.fromAnyRef("test-dataset")); + + List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDescriptors = flowTemplate.getResolvingDatasetDescriptors(flowConfig); Assert.assertTrue(inputOutputDescriptors.size() == 2); List<String> dirs = Lists.newArrayList("inbound", "outbound"); - for(int i=0; i<2; i++) { - for (int j=0; j<2; j++) { - HdfsDatasetDescriptor datasetDescriptor; + for (int i = 0; i < 2; i++) { + for (int j = 0; j < 2; j++) { + FSDatasetDescriptor datasetDescriptor; if (j == 0) { - datasetDescriptor = (HdfsDatasetDescriptor) inputOutputDescriptors.get(i).getLeft(); + datasetDescriptor = (FSDatasetDescriptor) inputOutputDescriptors.get(i).getLeft(); } else { - datasetDescriptor = (HdfsDatasetDescriptor) inputOutputDescriptors.get(i).getRight(); + datasetDescriptor = (FSDatasetDescriptor) inputOutputDescriptors.get(i).getRight(); } Assert.assertEquals(datasetDescriptor.getPlatform(), "hdfs"); - Assert.assertEquals(datasetDescriptor.getFormat(), "avro"); - Assert.assertEquals(datasetDescriptor.getPath(), "/data/" + dirs.get(i) + "/<TEAM_NAME>/<DATASET_NAME>"); + Assert.assertEquals(datasetDescriptor.getFormatConfig().getFormat(), "avro"); + Assert.assertEquals(datasetDescriptor.getPath(), "/data/" + dirs.get(i) + "/test-team/test-dataset"); } } Config flowTemplateConfig = flowTemplate.getRawTemplateConfig(); - Assert.assertEquals(flowTemplateConfig.getString("gobblin.flow.dataset.descriptor.input.0.class"), "org.apache.gobblin.service.modules.dataset.BaseHdfsDatasetDescriptor"); - Assert.assertEquals(flowTemplateConfig.getString("gobblin.flow.dataset.descriptor.output.0.class"), "org.apache.gobblin.service.modules.dataset.BaseHdfsDatasetDescriptor"); + Assert.assertEquals(flowTemplateConfig.getString(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX + ".0." + + DatasetDescriptorConfigKeys.CLASS_KEY), FSDatasetDescriptor.class.getCanonicalName()); + Assert.assertEquals(flowTemplateConfig.getString(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX + + ".0." + DatasetDescriptorConfigKeys.CLASS_KEY), FSDatasetDescriptor.class.getCanonicalName()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flow/flow.conf ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flow/flow.conf b/gobblin-service/src/test/resources/flow/flow.conf new file mode 100644 index 0000000..f818df6 --- /dev/null +++ b/gobblin-service/src/test/resources/flow/flow.conf @@ -0,0 +1,24 @@ +team.name=testTeam +dataset.name=testDataset +user.to.proxy=testUser +adls.user.to.proxy=adlsTestUser +adls.oauth2.client.id=1234 +adls.ouath2.credential=credential + +#Input dataset - uncompressed and unencrypted +gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor +gobblin.flow.input.dataset.descriptor.platform=hdfs +gobblin.flow.input.dataset.descriptor.path=/data/out/${team.name}/${dataset.name} +gobblin.flow.input.dataset.descriptor.format=avro +gobblin.flow.input.dataset.descriptor.codec=NONE +gobblin.flow.input.dataset.descriptor.encrypt.algorithm=NONE + +#Output dataset - compressed and encrypted +gobblin.flow.output.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor +gobblin.flow.output.dataset.descriptor.platform=adls +gobblin.flow.output.dataset.descriptor.path=/data/encrypted/${team.name}/${dataset.name} +gobblin.flow.output.dataset.descriptor.format=json +gobblin.flow.output.dataset.descriptor.codec=gzip +gobblin.flow.output.dataset.descriptor.encrypt.algorithm=aes_rotating +gobblin.flow.output.dataset.descriptor.encrypt.keystore_type=json +gobblin.flow.output.dataset.descriptor.encrypt.keystore_encoding=base64 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/AdlsDataNode-1.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/AdlsDataNode-1.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/AdlsDataNode-1.properties new file mode 100644 index 0000000..a219e4f --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/datanodes/AdlsDataNode-1.properties @@ -0,0 +1,3 @@ +data.node.id=ADLS-1 +data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.AdlsDataNode +data.node.fs.uri=adl://azuredatalakestore.net/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-1.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-1.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-1.properties new file mode 100644 index 0000000..cad5e03 --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-1.properties @@ -0,0 +1,3 @@ +data.node.id=HDFS-1 +data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode +data.node.fs.uri=hdfs://hadoopnn01.grid.linkedin.com:8888/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-2.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-2.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-2.properties new file mode 100644 index 0000000..eeb7980 --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-2.properties @@ -0,0 +1,3 @@ +data.node.id=HDFS-2 +data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode +data.node.fs.uri=hdfs://hadoopnn02.grid.linkedin.com:8888/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-3.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-3.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-3.properties new file mode 100644 index 0000000..61135ba --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-3.properties @@ -0,0 +1,3 @@ +data.node.id=HDFS-3 +data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode +data.node.fs.uri=hdfs://hadoopnn03.grid.linkedin.com:8888/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-4.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-4.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-4.properties new file mode 100644 index 0000000..a772f1c --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/datanodes/HdfsDataNode-4.properties @@ -0,0 +1,3 @@ +data.node.id=HDFS-4 +data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode +data.node.fs.uri=hdfs://hadoopnn04.grid.linkedin.com:8888/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/datanodes/LocalFsDataNode-1.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/LocalFsDataNode-1.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/LocalFsDataNode-1.properties new file mode 100644 index 0000000..6683221 --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/datanodes/LocalFsDataNode-1.properties @@ -0,0 +1,3 @@ +data.node.id=LocalFS-1 +data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.LocalFSDataNode +data.node.fs.uri=file:/// \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties new file mode 100644 index 0000000..bcf6d44 --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties @@ -0,0 +1,9 @@ +flow.edge.source=HDFS-1 +flow.edge.destination=HDFS-1 +flow.edge.id=HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt +flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt +flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor +flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443 +flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher +flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE +flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties new file mode 100644 index 0000000..99d1ed7 --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties @@ -0,0 +1,10 @@ +flow.edge.source=HDFS-1 +flow.edge.destination=HDFS-3 +flow.edge.id=HDFS-1:HDFS-3:hdfsToHdfs +flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs +flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor +flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443 +flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher +flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE +flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava + http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties new file mode 100644 index 0000000..537cbfa --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties @@ -0,0 +1,9 @@ +flow.edge.source=HDFS-2 +flow.edge.destination=HDFS-2 +flow.edge.id=HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt +flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt +flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor +flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443 +flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher +flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE +flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties new file mode 100644 index 0000000..6ec2ea5 --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties @@ -0,0 +1,9 @@ +flow.edge.source=HDFS-2 +flow.edge.destination=HDFS-4 +flow.edge.id=HDFS-2:HDFS-4:hdfsToHdfs +flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs +flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor +flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443 +flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher +flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE +flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties new file mode 100644 index 0000000..ed6e899 --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-adls-1.properties @@ -0,0 +1,13 @@ +flow.edge.source=HDFS-3 +flow.edge.destination=ADLS-1 +flow.edge.id=HDFS-3:ADLS-1:hdfsToAdl +flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl +flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor +flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban03.gobblin.net:8443 +flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher +flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE +flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava + +# Proxy config +flow.edge.proxy.host=adl-proxy.linkedin.com +flow.edge.proxy.port=1234 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties new file mode 100644 index 0000000..eae2767 --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-adls-1.properties @@ -0,0 +1,13 @@ +flow.edge.source=HDFS-4 +flow.edge.destination=ADLS-1 +flow.edge.id=HDFS-4:ADLS-1:hdfsToAdl +flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToAdl +flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor +flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban04.gobblin.net:8443 +flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher +flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE +flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava + +# Proxy config +flow.edge.proxy.host=adl-proxy.linkedin.com +flow.edge.proxy.port=1234 http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties new file mode 100644 index 0000000..268b67f --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-1.properties @@ -0,0 +1,9 @@ +flow.edge.source=LocalFS-1 +flow.edge.destination=HDFS-1 +flow.edge.id=LocalFS-1:HDFS-1:localToHdfs +flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/localToHdfs +flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor +flow.edge.specExecutors.0.specExecInstance.uri=fs:/// +flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher +flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL +flow.edge.specExecutors.0.specExecInstance.job.type=java http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties new file mode 100644 index 0000000..bc67810 --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-hdfs-2.properties @@ -0,0 +1,9 @@ +flow.edge.source=LocalFS-1 +flow.edge.destination=HDFS-2 +flow.edge.id=LocalFS-1:HDFS-2:localToHdfs +flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/localToHdfs +flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor +flow.edge.specExecutors.0.specExecInstance.uri=fs:/// +flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher +flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL +flow.edge.specExecutors.0.specExecInstance.job.type=java \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java deleted file mode 100644 index 43fa9a3..0000000 --- a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java +++ /dev/null @@ -1,314 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service.modules.core; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.ExecutionException; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.SystemUtils; -import org.eclipse.jgit.api.Git; -import org.eclipse.jgit.api.errors.GitAPIException; -import org.eclipse.jgit.dircache.DirCache; -import org.eclipse.jgit.lib.Repository; -import org.eclipse.jgit.lib.RepositoryCache; -import org.eclipse.jgit.revwalk.RevCommit; -import org.eclipse.jgit.transport.RefSpec; -import org.eclipse.jgit.util.FS; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import com.google.common.base.Charsets; -import com.google.common.io.Files; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -import org.apache.gobblin.config.ConfigBuilder; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.service.ServiceConfigKeys; -import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph; -import org.apache.gobblin.service.modules.flowgraph.DataNode; -import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; -import org.apache.gobblin.service.modules.flowgraph.FlowEdge; -import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; - - -public class GitFlowGraphMonitorTest { - private static final Logger logger = LoggerFactory.getLogger(GitFlowGraphMonitor.class); - private Repository remoteRepo; - private Git gitForPush; - private static final String TEST_DIR = "/tmp/gitFlowGraphTestDir"; - private final File remoteDir = new File(TEST_DIR + "/remote"); - private final File cloneDir = new File(TEST_DIR + "/clone"); - private final File flowGraphDir = new File(cloneDir, "/gobblin-flowgraph"); - private static final String NODE_1_FILE = "node1.properties"; - private final File node1Dir = new File(flowGraphDir, "node1"); - private final File node1File = new File(node1Dir, NODE_1_FILE); - private static final String NODE_2_FILE = "node2.properties"; - private final File node2Dir = new File(flowGraphDir, "node2"); - private final File node2File = new File(node2Dir, NODE_2_FILE); - private final File edge1Dir = new File(node1Dir, "node2"); - private final File edge1File = new File(edge1Dir, "edge1.properties"); - - private RefSpec masterRefSpec = new RefSpec("master"); - private FSFlowCatalog flowCatalog; - private Config config; - private BaseFlowGraph flowGraph; - private GitFlowGraphMonitor gitFlowGraphMonitor; - - @BeforeClass - public void setUp() throws Exception { - cleanUpDir(TEST_DIR); - - // Create a bare repository - RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir, FS.DETECTED); - this.remoteRepo = fileKey.open(false); - this.remoteRepo.create(true); - - this.gitForPush = Git.cloneRepository().setURI(this.remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call(); - - // push an empty commit as a base for detecting changes - this.gitForPush.commit().setMessage("First commit").call(); - this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); - - this.config = ConfigBuilder.create() - .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." - + ConfigurationKeys.GIT_MONITOR_REPO_URI, this.remoteRepo.getDirectory().getAbsolutePath()) - .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/git-flowgraph") - .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5) - .build(); - - // Create a FSFlowCatalog instance - URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI(); - Properties properties = new Properties(); - properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString()); - Config config = ConfigFactory.parseProperties(properties); - Config templateCatalogCfg = config - .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, - config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)); - this.flowCatalog = new FSFlowCatalog(templateCatalogCfg); - - //Create a FlowGraph instance with defaults - this.flowGraph = new BaseFlowGraph(); - - this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph); - this.gitFlowGraphMonitor.setActive(true); - } - - private void testAddNodeHelper(File nodeDir, File nodeFile, String nodeId, String paramValue) - throws IOException, GitAPIException { - // push a new node file - nodeDir.mkdirs(); - nodeFile.createNewFile(); - Files.write(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=" + paramValue + "\n", nodeFile, Charsets.UTF_8); - - // add, commit, push node - this.gitForPush.add().addFilepattern(formNodeFilePath(nodeDir.getName(), nodeFile.getName())).call(); - this.gitForPush.commit().setMessage("Node commit").call(); - this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); - - this.gitFlowGraphMonitor.processGitConfigChanges(); - - //Check if node1 has been added to the FlowGraph - DataNode dataNode = this.flowGraph.getNode(nodeId); - Assert.assertEquals(dataNode.getId(), nodeId); - Assert.assertTrue(dataNode.isActive()); - Assert.assertEquals(dataNode.getProps().getString("param1"), paramValue); - } - - @Test - public void testAddNode() - throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException { - testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value1"); - testAddNodeHelper(this.node2Dir, this.node2File, "node2", "value2"); - } - - @Test (dependsOnMethods = "testAddNode") - public void testAddEdge() - throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException { - // push a new node file - this.edge1Dir.mkdirs(); - this.edge1File.createNewFile(); - - Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY + "=FS:///test-template/flow.conf\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0." - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1." - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n", edge1File, Charsets.UTF_8); - - // add, commit, push - this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call(); - this.gitForPush.commit().setMessage("Edge commit").call(); - this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); - - this.gitFlowGraphMonitor.processGitConfigChanges(); - - //Check if edge1 has been added to the FlowGraph - Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1"); - Assert.assertEquals(edgeSet.size(), 1); - FlowEdge flowEdge = edgeSet.iterator().next(); - Assert.assertEquals(flowEdge.getEndPoints().get(0), "node1"); - Assert.assertEquals(flowEdge.getEndPoints().get(1), "node2"); - Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1"); - Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1"); - Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor"); - Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2"); - Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2"); - Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor"); - } - - @Test (dependsOnMethods = "testAddNode") - public void testUpdateEdge() - throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException { - //Update edge1 file - Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY + "=FS:///test-template/flow.conf\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0." - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1." - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n" - + "key1=value1\n", edge1File, Charsets.UTF_8); - - // add, commit, push - this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call(); - this.gitForPush.commit().setMessage("Edge commit").call(); - this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); - - this.gitFlowGraphMonitor.processGitConfigChanges(); - - //Check if new edge1 has been added to the FlowGraph - Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1"); - Assert.assertEquals(edgeSet.size(), 1); - FlowEdge flowEdge = edgeSet.iterator().next(); - Assert.assertEquals(flowEdge.getEndPoints().get(0), "node1"); - Assert.assertEquals(flowEdge.getEndPoints().get(1), "node2"); - Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1"); - Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1"); - Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor"); - Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2"); - Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2"); - Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor"); - Assert.assertEquals(flowEdge.getProps().getString("key1"), "value1"); - } - - @Test (dependsOnMethods = "testUpdateEdge") - public void testUpdateNode() - throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException { - //Update param1 value in node1 and check if updated node is added to the graph - testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value3"); - } - - - @Test (dependsOnMethods = "testUpdateNode") - public void testRemoveEdge() throws GitAPIException, IOException { - // delete a config file - edge1File.delete(); - - //Node1 has 1 edge before delete - Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1"); - Assert.assertEquals(edgeSet.size(), 1); - - // delete, commit, push - DirCache ac = this.gitForPush.rm().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), - this.edge1Dir.getName(), this.edge1File.getName())).call(); - RevCommit cc = this.gitForPush.commit().setMessage("Edge remove commit").call(); - this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); - - this.gitFlowGraphMonitor.processGitConfigChanges(); - - //Check if edge1 has been deleted from the graph - edgeSet = this.flowGraph.getEdges("node1"); - Assert.assertTrue(edgeSet.size() == 0); - } - - @Test (dependsOnMethods = "testRemoveEdge") - public void testRemoveNode() throws GitAPIException, IOException { - //delete node file - node1File.delete(); - - //node1 is present in the graph before delete - DataNode node1 = this.flowGraph.getNode("node1"); - Assert.assertNotNull(node1); - - // delete, commit, push - DirCache ac = this.gitForPush.rm().addFilepattern(formNodeFilePath(this.node1Dir.getName(), this.node1File.getName())).call(); - RevCommit cc = this.gitForPush.commit().setMessage("Node remove commit").call(); - this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); - - this.gitFlowGraphMonitor.processGitConfigChanges(); - - //Check if node1 has been deleted from the graph - node1 = this.flowGraph.getNode("node1"); - Assert.assertNull(node1); - } - - - private void cleanUpDir(String dir) { - File specStoreDir = new File(dir); - - // cleanup is flaky on Travis, so retry a few times and then suppress the error if unsuccessful - for (int i = 0; i < 5; i++) { - try { - if (specStoreDir.exists()) { - FileUtils.deleteDirectory(specStoreDir); - } - // if delete succeeded then break out of loop - break; - } catch (IOException e) { - logger.warn("Cleanup delete directory failed for directory: " + dir, e); - } - } - } - - private String formNodeFilePath(String groupDir, String fileName) { - return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName; - } - - private String formEdgeFilePath(String parentDir, String groupDir, String fileName) { - return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + parentDir + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName; - } - - @AfterClass - public void tearDown() throws Exception { - cleanUpDir(TEST_DIR); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java b/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java deleted file mode 100644 index 9dd51a0..0000000 --- a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdgeFactoryTest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service.modules.flowgraph; - -import java.net.URI; -import java.util.Properties; - -import org.apache.gobblin.util.ConfigUtils; -import org.testng.Assert; -import org.testng.annotations.Test; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.service.ServiceConfigKeys; -import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; - -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class BaseFlowEdgeFactoryTest { - @Test - public void testCreateFlowEdge() throws Exception { - Properties properties = new Properties(); - properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,"node1"); - properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, "node2"); - properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY, "edge1"); - properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY, "FS:///test-template/flow.conf"); - properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor"); - properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specStore.fs.dir", "/tmp1"); - properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".0.specExecInstance.capabilities", "s1:d1"); - properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1."+FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY,"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor"); - properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specStore.fs.dir", "/tmp2"); - properties.put(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY+".1.specExecInstance.capabilities", "s2:d2"); - - FlowEdgeFactory flowEdgeFactory = new BaseFlowEdge.Factory(); - - Properties props = new Properties(); - URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI(); - props.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString()); - Config config = ConfigFactory.parseProperties(props); - Config templateCatalogCfg = config - .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, - config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)); - FSFlowCatalog catalog = new FSFlowCatalog(templateCatalogCfg); - Config edgeProps = ConfigUtils.propertiesToConfig(properties); - FlowEdge flowEdge = flowEdgeFactory.createFlowEdge(edgeProps, catalog); - Assert.assertEquals(flowEdge.getEndPoints().get(0), "node1"); - Assert.assertEquals(flowEdge.getEndPoints().get(1), "node2"); - Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),"/tmp1"); - Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"),"s1:d1"); - Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"),"/tmp2"); - Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"),"s2:d2"); - Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(),"InMemorySpecExecutor"); - Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(),"InMemorySpecExecutor"); - } -} \ No newline at end of file
