http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/AdlsDataNode.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/AdlsDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/AdlsDataNode.java new file mode 100644 index 0000000..c6f5c74 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/AdlsDataNode.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.flowgraph.datanodes.fs; + +import java.net.URI; + +import com.google.common.base.Strings; +import com.typesafe.config.Config; + + +/** + * An implementation of an ADL (Azure Data Lake) {@link org.apache.gobblin.service.modules.flowgraph.DataNode}. + */ +public class AdlsDataNode extends FileSystemDataNode { + public static final String ADLS_SCHEME = "adl"; + + public AdlsDataNode(Config nodeProps) throws DataNodeCreationException { + super(nodeProps); + } + + /** + * @param fsUri FileSystem URI + * @return true if the scheme is "adl" and authority is not empty. + */ + @Override + public boolean isUriValid(URI fsUri) { + String scheme = fsUri.getScheme(); + //Check that the scheme is "adl" + if (!scheme.equals(ADLS_SCHEME)) { + return false; + } + //Ensure that the authority is not empty + if (Strings.isNullOrEmpty(fsUri.getAuthority())) { + return false; + } + return true; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java new file mode 100644 index 0000000..72f1a66 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flowgraph.datanodes.fs; + +import java.io.IOException; +import java.net.URI; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.service.modules.flowgraph.BaseDataNode; +import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; +import org.apache.gobblin.util.ConfigUtils; + +import joptsimple.internal.Strings; +import lombok.Getter; + + +/** + * An abstract {@link FileSystemDataNode} implementation. In addition to the required properties of a {@link BaseDataNode}, an {@link FileSystemDataNode} + * must have a FS URI specified. Example implementations of {@link FileSystemDataNode} include {@link HdfsDataNode}, {@link LocalFSDataNode}. + */ +@Alpha +public abstract class FileSystemDataNode extends BaseDataNode { + public static final String FS_URI_KEY = FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "fs.uri"; + + @Getter + private String fsUri; + + /** + * Constructor. An HDFS DataNode must have fs.uri property specified in addition to a node Id. + */ + public FileSystemDataNode(Config nodeProps) throws DataNodeCreationException { + super(nodeProps); + try { + this.fsUri = ConfigUtils.getString(nodeProps, FS_URI_KEY, ""); + Preconditions.checkArgument(!Strings.isNullOrEmpty(this.fsUri), "fs.uri cannot be null or empty."); + + //Validate the srcFsUri and destFsUri of the DataNode. + if (!isUriValid(new URI(this.fsUri))) { + throw new IOException("Invalid FS URI " + this.fsUri); + } + } catch (Exception e) { + throw new DataNodeCreationException(e); + } + } + + public abstract boolean isUriValid(URI fsUri); + /** + * Two HDFS DataNodes are the same if they have the same id and the same fsUri. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FileSystemDataNode that = (FileSystemDataNode) o; + + return this.getId().equals(that.getId()) && this.fsUri.equals(that.getFsUri()); + } + + @Override + public int hashCode() { + return Joiner.on("-").join(this.getId(), this.fsUri).hashCode(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/HdfsDataNode.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/HdfsDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/HdfsDataNode.java new file mode 100644 index 0000000..5402074 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/HdfsDataNode.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flowgraph.datanodes.fs; + +import java.net.URI; + +import com.google.common.base.Strings; +import com.typesafe.config.Config; + +import org.apache.gobblin.annotation.Alpha; + + +/** + * An implementation of {@link HdfsDataNode}. All the properties specific to a HDFS based data node (e.g. fs.uri) + * are validated here. + */ +@Alpha +public class HdfsDataNode extends FileSystemDataNode { + public static final String HDFS_SCHEME = "hdfs"; + + public HdfsDataNode(Config nodeProps) throws DataNodeCreationException { + super(nodeProps); + } + + /** + * + * @param fsUri FileSystem URI + * @return true if the scheme is "hdfs" and authority is not empty. + */ + @Override + public boolean isUriValid(URI fsUri) { + String scheme = fsUri.getScheme(); + //Check that the scheme is "hdfs" + if (!scheme.equals(HDFS_SCHEME)) { + return false; + } + //Ensure that the authority is not empty + if (Strings.isNullOrEmpty(fsUri.getAuthority())) { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/LocalFSDataNode.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/LocalFSDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/LocalFSDataNode.java new file mode 100644 index 0000000..757d4a0 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/LocalFSDataNode.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flowgraph.datanodes.fs; + +import java.net.URI; + +import org.apache.gobblin.annotation.Alpha; + +import com.typesafe.config.Config; + +/** + * An implementation of {@link LocalFSDataNode}. All the properties specific to a LocalFS based data node (e.g. fs.uri) + * are validated here. + */ +@Alpha +public class LocalFSDataNode extends FileSystemDataNode { + public static final String LOCAL_FS_SCHEME = "file"; + + public LocalFSDataNode(Config nodeProps) throws DataNodeCreationException { + super(nodeProps); + } + + /** + * + * @param fsUri FileSystem URI + * @return true if the scheme of fsUri equals "file" + */ + @Override + public boolean isUriValid(URI fsUri) { + String scheme = fsUri.getScheme(); + if (scheme.equals(LOCAL_FS_SCHEME)) { + return true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java new file mode 100644 index 0000000..c0c9297 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.spec; + +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.commons.lang3.StringUtils; + +import com.google.common.base.Joiner; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.util.ConfigUtils; + + +/** + * A data class that encapsulates information for executing a job. This includes a {@link JobSpec} and a {@link SpecExecutor} + * where the {@link JobSpec} will be executed. + */ +@Data +@AllArgsConstructor +public class JobExecutionPlan { + private JobSpec jobSpec; + private SpecExecutor specExecutor; + + public static class Factory { + + public JobExecutionPlan createPlan(FlowSpec flowSpec, Config jobConfig, SpecExecutor specExecutor, Long flowExecutionId) + throws URISyntaxException { + JobSpec jobSpec = buildJobSpec(flowSpec, jobConfig, flowExecutionId); + return new JobExecutionPlan(jobSpec, specExecutor); + } + + /** + * Given a resolved job config, this helper method converts the config to a {@link JobSpec}. + * @param jobConfig resolved job config. + * @param flowSpec input FlowSpec. + * @return a {@link JobSpec} corresponding to the resolved job config. + */ + private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, Long flowExecutionId) throws URISyntaxException { + Config flowConfig = flowSpec.getConfig(); + + String flowName = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_NAME_KEY, ""); + String flowGroup = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_GROUP_KEY, ""); + String jobName = ConfigUtils.getString(jobConfig, ConfigurationKeys.JOB_NAME_KEY, ""); + + //Modify the job name to include the flow group:flow name. + jobName = Joiner.on(":").join(flowGroup, flowName, jobName); + + JobSpec.Builder jobSpecBuilder = JobSpec.builder(jobSpecURIGenerator(flowGroup, jobName, flowSpec)).withConfig(jobConfig) + .withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion()); + + //Get job template uri + URI jobTemplateUri = new URI(jobConfig.getString(ConfigurationKeys.JOB_TEMPLATE_PATH)); + JobSpec jobSpec = jobSpecBuilder.withTemplate(jobTemplateUri).build(); + + //Add flowName to job spec + jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName))); + + //Add job name + jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_NAME_KEY, ConfigValueFactory.fromAnyRef(jobName))); + + //Add flow execution id + jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId))); + + // Remove schedule + jobSpec.setConfig(jobSpec.getConfig().withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY)); + + // Add job.name and job.group + jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_NAME_KEY, ConfigValueFactory.fromAnyRef(jobName))); + jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup))); + + //Enable job lock for each job to prevent concurrent executions. + jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_LOCK_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true))); + + // Reset properties in Spec from Config + jobSpec.setConfigAsProperties(ConfigUtils.configToProperties(jobSpec.getConfig())); + + return jobSpec; + } + + + /** + * A naive implementation of generating a jobSpec's URI within a multi-hop flow that follows the convention: + * <JOB_CATALOG_SCHEME>/{@link ConfigurationKeys#JOB_GROUP_KEY}/{@link ConfigurationKeys#JOB_NAME_KEY}. + */ + public static URI jobSpecURIGenerator(String jobGroup, String jobName, FlowSpec flowSpec) + throws URISyntaxException { + return new URI(JobSpec.Builder.DEFAULT_JOB_CATALOG_SCHEME, flowSpec.getUri().getAuthority(), + StringUtils.appendIfMissing(StringUtils.prependIfMissing(flowSpec.getUri().getPath(), "/"), "/") + jobGroup + + "/" + jobName, null); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java new file mode 100644 index 0000000..f942f8d --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.spec; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Optional; +import com.google.common.io.Files; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; + + +/** + * A Factory class used for constructing a {@link Dag} of {@link JobExecutionPlan}s from + * a {@link List} of {@link JobExecutionPlan}s. + */ +@Alpha +@Slf4j +public class JobExecutionPlanDagFactory { + + public Dag<JobExecutionPlan> createDag(List<JobExecutionPlan> jobExecutionPlans) { + //Maintain a mapping between job name and the corresponding JobExecutionPlan. + Map<String, Dag.DagNode<JobExecutionPlan>> JobExecutionPlanMap = new HashMap<>(); + List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>(); + /** + * Create a {@link Dag.DagNode<JobExecutionPlan>} for every {@link JobSpec} in the flow. Add this node + * to a HashMap. + */ + for (JobExecutionPlan jobExecutionPlan : jobExecutionPlans) { + Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan); + dagNodeList.add(dagNode); + String jobName = getJobName(jobExecutionPlan); + if (jobName != null) { + JobExecutionPlanMap.put(jobName, dagNode); + } + } + + /** + * Iterate over each {@link JobSpec} to get the dependencies of each {@link JobSpec}. + * For each {@link JobSpec}, get the corresponding {@link Dag.DagNode} and + * set the {@link Dag.DagNode}s corresponding to its dependencies as its parent nodes. + * + * TODO: we likely do not need 2 for loops and we can do this in 1 pass. + */ + for (JobExecutionPlan jobExecutionPlan : jobExecutionPlans) { + String jobName = getJobName(jobExecutionPlan); + if (jobName == null) { + continue; + } + Dag.DagNode<JobExecutionPlan> node = JobExecutionPlanMap.get(jobName); + Collection<String> dependencies = getDependencies(jobExecutionPlan.getJobSpec().getConfig()); + for (String dependency : dependencies) { + Dag.DagNode<JobExecutionPlan> parentNode = JobExecutionPlanMap.get(dependency); + node.addParentNode(parentNode); + } + } + Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList); + return dag; + } + + /** + * Get job dependencies of a given job from its config. + * @param config of a job. + * @return a list of dependencies of the job. + */ + private static List<String> getDependencies(Config config) { + return config.hasPath(ConfigurationKeys.JOB_DEPENDENCIES) ? Arrays + .asList(config.getString(ConfigurationKeys.JOB_DEPENDENCIES).split(",")) : new ArrayList<>(); + } + + /** + * The job name is derived from the {@link org.apache.gobblin.runtime.api.JobTemplate} URI. It is the + * simple name of the path component of the URI. + * @param jobExecutionPlan + * @return the simple name from the URI path. + */ + private static String getJobName(JobExecutionPlan jobExecutionPlan) { + Optional<URI> jobTemplateUri = jobExecutionPlan.getJobSpec().getTemplateURI(); + if (jobTemplateUri.isPresent()) { + return Files.getNameWithoutExtension(new Path(jobTemplateUri.get()).getName()); + } else { + return null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java index 30d8309..a8350fd 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java @@ -20,16 +20,16 @@ package org.apache.gobblin.service.modules.template; import java.io.IOException; import java.util.Collection; import java.util.List; -import java.util.Map; + +import org.apache.commons.lang3.tuple.Pair; import com.typesafe.config.Config; -import org.apache.commons.lang3.tuple.Pair; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.runtime.api.JobTemplate; import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.service.modules.dataset.DatasetDescriptor; -import org.apache.gobblin.service.modules.flowgraph.Dag; /** * An interface primarily for representing a flow of {@link JobTemplate}s. It also has @@ -45,18 +45,37 @@ public interface FlowTemplate extends Spec { /** * - * @return the {@link Dag<JobTemplate>} that backs the {@link FlowTemplate}. + * @return all configuration inside pre-written template. */ - Dag<JobTemplate> getDag() throws IOException; + Config getRawTemplateConfig(); /** - * - * @return all configuration inside pre-written template. + * @param userConfig a list of user customized attributes. + * @return list of input/output {@link DatasetDescriptor}s that fully resolve the {@link FlowTemplate} using the + * provided userConfig. */ - Config getRawTemplateConfig(); + List<Pair<DatasetDescriptor, DatasetDescriptor>> getResolvingDatasetDescriptors(Config userConfig) + throws IOException, ReflectiveOperationException, SpecNotFoundException, JobTemplate.TemplateException; + + /** + * Checks if the {@link FlowTemplate} is resolvable using the provided {@link Config} object. A {@link FlowTemplate} + * is resolvable only if each of the {@link JobTemplate}s in the flow is resolvable + * @param userConfig User supplied Config + * @param inputDescriptor input {@link DatasetDescriptor} + * @param outputDescriptor output {@link DatasetDescriptor} + * @return true if the {@link FlowTemplate} is resolvable + */ + boolean isResolvable(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor) + throws SpecNotFoundException, JobTemplate.TemplateException; /** - * @return list of input/output {@link DatasetDescriptor}s for the {@link FlowTemplate}. + * Resolves the {@link JobTemplate}s underlying this {@link FlowTemplate} and returns a {@link List} of resolved + * job {@link Config}s. + * @param userConfig User supplied Config + * @param inputDescriptor input {@link DatasetDescriptor} + * @param outputDescriptor output {@link DatasetDescriptor} + * @return a list of resolved job {@link Config}s. */ - List<Pair<DatasetDescriptor, DatasetDescriptor>> getInputOutputDatasetDescriptors(); + List<Config> getResolvedJobConfigs(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor) + throws SpecNotFoundException, JobTemplate.TemplateException; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java index 553f067..ba9c091 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URI; +import java.net.URISyntaxException; import com.google.common.base.Charsets; import com.typesafe.config.Config; @@ -43,15 +44,15 @@ public class HOCONInputStreamFlowTemplate extends StaticFlowTemplate { public static final String VERSION_KEY = "gobblin.flow.template.version"; private static final String DEFAULT_VERSION = "1"; - public HOCONInputStreamFlowTemplate(InputStream inputStream, URI uri, FlowCatalogWithTemplates catalog) - throws SpecNotFoundException, IOException, ReflectiveOperationException, JobTemplate.TemplateException { + public HOCONInputStreamFlowTemplate(InputStream inputStream, URI flowTemplateDirUri, FlowCatalogWithTemplates catalog) + throws SpecNotFoundException, IOException, JobTemplate.TemplateException, URISyntaxException { this(ConfigFactory.parseReader(new InputStreamReader(inputStream, Charsets.UTF_8)).resolve( - ConfigResolveOptions.defaults().setAllowUnresolved(true)), uri, catalog); + ConfigResolveOptions.defaults().setAllowUnresolved(true)), flowTemplateDirUri, catalog); } - public HOCONInputStreamFlowTemplate(Config config, URI uri, FlowCatalogWithTemplates catalog) - throws SpecNotFoundException, IOException, ReflectiveOperationException, JobTemplate.TemplateException { - super(uri, ConfigUtils.getString(config, VERSION_KEY, DEFAULT_VERSION), + public HOCONInputStreamFlowTemplate(Config config, URI flowTemplateDirUri, FlowCatalogWithTemplates catalog) + throws SpecNotFoundException, IOException, JobTemplate.TemplateException, URISyntaxException { + super(flowTemplateDirUri, config.hasPath(VERSION_KEY) ? config.getString(VERSION_KEY) : DEFAULT_VERSION, config.hasPath(ConfigurationKeys.FLOW_DESCRIPTION_KEY) ? config .getString(ConfigurationKeys.FLOW_DESCRIPTION_KEY) : "", config, catalog); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java deleted file mode 100644 index b89da4d..0000000 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service.modules.template; - -import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.gobblin.annotation.Alpha; -import org.apache.hadoop.fs.Path; - -import org.apache.gobblin.runtime.api.JobTemplate; -import org.apache.gobblin.service.modules.flowgraph.Dag; - -import lombok.extern.slf4j.Slf4j; - - -/** - * A Factory class used for constructing a {@link Dag} of {@link org.apache.gobblin.runtime.api.JobTemplate}s from - * a {@link URI} of a {@link FlowTemplate}. - */ -@Alpha -@Slf4j -public class JobTemplateDagFactory { - public static final String JOB_TEMPLATE_FILE_SUFFIX = ".conf"; - - public static Dag<JobTemplate> createDagFromJobTemplates(List<JobTemplate> jobTemplates) { - Map<URI, Dag.DagNode<JobTemplate>> uriJobTemplateMap = new HashMap<>(); - List<Dag.DagNode<JobTemplate>> dagNodeList = new ArrayList<>(); - /** - * Create a {@link Dag.DagNode<JobTemplate>} for every {@link JobTemplate} in the flow. Add this node - * to a {@link Map<URI,JobTemplate>}. - */ - for (JobTemplate template : jobTemplates) { - Dag.DagNode<JobTemplate> dagNode = new Dag.DagNode<>(template); - dagNodeList.add(dagNode); - uriJobTemplateMap.put(template.getUri(), dagNode); - } - - /** - * Iterate over each {@link JobTemplate} to get the dependencies of each {@link JobTemplate}. - * For each {@link JobTemplate}, get the corresponding {@link Dag.DagNode} and - * set the {@link Dag.DagNode}s corresponding to the dependencies as its parent nodes. - * - * TODO: we likely do not need 2 for loops and we can do this in 1 pass. - */ - Path templateDirPath = new Path(jobTemplates.get(0).getUri()).getParent(); - for (JobTemplate template : jobTemplates) { - URI templateUri = template.getUri(); - Dag.DagNode<JobTemplate> node = uriJobTemplateMap.get(templateUri); - Collection<String> dependencies = template.getDependencies(); - for (String dependency : dependencies) { - URI dependencyUri = new Path(templateDirPath, dependency).suffix(JOB_TEMPLATE_FILE_SUFFIX).toUri(); - Dag.DagNode<JobTemplate> parentNode = uriJobTemplateMap.get(dependencyUri); - node.addParentNode(parentNode); - } - } - Dag<JobTemplate> dag = new Dag<>(dagNodeList); - return dag; - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java index 46f99d3..5f8dfda 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java @@ -19,38 +19,40 @@ package org.apache.gobblin.service.modules.template; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; + import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.typesafe.config.Config; +import com.typesafe.config.ConfigResolveOptions; +import com.typesafe.config.ConfigValueFactory; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.api.JobTemplate; import org.apache.gobblin.runtime.api.SpecNotFoundException; -import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.dataset.DatasetDescriptor; +import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys; import org.apache.gobblin.service.modules.template_catalog.FlowCatalogWithTemplates; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; -import org.apache.hadoop.fs.Path; - -import lombok.Getter; /** * A {@link FlowTemplate} using a static {@link Config} as the raw configuration for the template. */ @Alpha +@Slf4j public class StaticFlowTemplate implements FlowTemplate { private static final long serialVersionUID = 84641624233978L; - public static final String INPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.dataset.descriptor.input"; - public static final String OUTPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.dataset.descriptor.output"; - public static final String DATASET_DESCRIPTOR_CLASS_KEY = "class"; - @Getter private URI uri; @Getter @@ -60,69 +62,74 @@ public class StaticFlowTemplate implements FlowTemplate { @Getter private transient FlowCatalogWithTemplates catalog; @Getter - private List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDatasetDescriptors; - @Getter private List<JobTemplate> jobTemplates; - private transient Dag<JobTemplate> dag; - private transient Config rawConfig; - private boolean isTemplateMaterialized; - public StaticFlowTemplate(URI uri, String version, String description, Config config, + public StaticFlowTemplate(URI flowTemplateDirUri, String version, String description, Config config, FlowCatalogWithTemplates catalog) - throws IOException, ReflectiveOperationException, SpecNotFoundException, JobTemplate.TemplateException { - this.uri = uri; + throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException { + this.uri = flowTemplateDirUri; this.version = version; this.description = description; - this.inputOutputDatasetDescriptors = buildInputOutputDescriptors(config); this.rawConfig = config; this.catalog = catalog; - URI flowTemplateDir = new Path(this.uri).getParent().toUri(); - this.jobTemplates = this.catalog.getJobTemplatesForFlow(flowTemplateDir); + this.jobTemplates = this.catalog.getJobTemplatesForFlow(flowTemplateDirUri); } //Constructor for testing purposes - public StaticFlowTemplate(URI uri, String version, String description, Config config, - FlowCatalogWithTemplates catalog, List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDatasetDescriptors, List<JobTemplate> jobTemplates) - throws IOException, ReflectiveOperationException, SpecNotFoundException, JobTemplate.TemplateException { + public StaticFlowTemplate(URI uri, String version, String description, Config config, FlowCatalogWithTemplates catalog, List<JobTemplate> jobTemplates) { this.uri = uri; this.version = version; this.description = description; - this.inputOutputDatasetDescriptors = inputOutputDatasetDescriptors; this.rawConfig = config; this.catalog = catalog; this.jobTemplates = jobTemplates; } + /** * Generate the input/output dataset descriptors for the {@link FlowTemplate}. + * @param userConfig + * @return a List of Input/Output DatasetDescriptors that resolve this {@link FlowTemplate}. */ - private List<Pair<DatasetDescriptor, DatasetDescriptor>> buildInputOutputDescriptors(Config config) - throws IOException, ReflectiveOperationException { - if (!config.hasPath(INPUT_DATASET_DESCRIPTOR_PREFIX) || !config.hasPath(OUTPUT_DATASET_DESCRIPTOR_PREFIX)) { + @Override + public List<Pair<DatasetDescriptor, DatasetDescriptor>> getResolvingDatasetDescriptors(Config userConfig) + throws IOException, SpecNotFoundException, JobTemplate.TemplateException { + Config config = this.getResolvedFlowConfig(userConfig).resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true)); + + if (!config.hasPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX) + || !config.hasPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX)) { throw new IOException("Flow template must specify at least one input/output dataset descriptor"); } + int i = 0; - String inputPrefix = Joiner.on(".").join(INPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i)); + String inputPrefix = Joiner.on(".").join(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i)); List<Pair<DatasetDescriptor, DatasetDescriptor>> result = Lists.newArrayList(); while (config.hasPath(inputPrefix)) { - Config inputDescriptorConfig = config.getConfig(inputPrefix); - DatasetDescriptor inputDescriptor = getDatasetDescriptor(inputDescriptorConfig); - String outputPrefix = Joiner.on(".").join(OUTPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i++)); - Config outputDescriptorConfig = config.getConfig(outputPrefix); - DatasetDescriptor outputDescriptor = getDatasetDescriptor(outputDescriptorConfig); - result.add(ImmutablePair.of(inputDescriptor, outputDescriptor)); - inputPrefix = Joiner.on(".").join(INPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i)); + try { + Config inputDescriptorConfig = config.getConfig(inputPrefix); + DatasetDescriptor inputDescriptor = getDatasetDescriptor(inputDescriptorConfig); + String outputPrefix = Joiner.on(".") + .join(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i)); + Config outputDescriptorConfig = config.getConfig(outputPrefix); + DatasetDescriptor outputDescriptor = getDatasetDescriptor(outputDescriptorConfig); + + if (isResolvable(userConfig, inputDescriptor, outputDescriptor)) { + result.add(ImmutablePair.of(inputDescriptor, outputDescriptor)); + } + } catch (ReflectiveOperationException e) { + //Cannot instantiate I/O dataset descriptor due to missing config; skip and try the next one. + } + inputPrefix = Joiner.on(".").join(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(++i)); } return result; } private DatasetDescriptor getDatasetDescriptor(Config descriptorConfig) throws ReflectiveOperationException { - Class datasetDescriptorClass = Class.forName(descriptorConfig.getString(DATASET_DESCRIPTOR_CLASS_KEY)); - return (DatasetDescriptor) GobblinConstructorUtils - .invokeLongestConstructor(datasetDescriptorClass, descriptorConfig); + Class datasetDescriptorClass = Class.forName(descriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY)); + return (DatasetDescriptor) GobblinConstructorUtils.invokeLongestConstructor(datasetDescriptorClass, descriptorConfig); } @Override @@ -130,27 +137,53 @@ public class StaticFlowTemplate implements FlowTemplate { return this.rawConfig; } - private void ensureTemplateMaterialized() - throws IOException { - try { - if (!isTemplateMaterialized) { - this.dag = JobTemplateDagFactory.createDagFromJobTemplates(this.jobTemplates); - } - this.isTemplateMaterialized = true; - } catch (Exception e) { - throw new IOException(e); - } - } - @Override public List<JobTemplate> getJobTemplates() { return this.jobTemplates; } + private Config getResolvedFlowConfig(Config userConfig) { + return userConfig.withFallback(this.rawConfig); + } + + /** + * Checks if the {@link FlowTemplate} is resolvable using the provided {@link Config} object. A {@link FlowTemplate} + * is resolvable only if each of the {@link JobTemplate}s in the flow is resolvable + * @param userConfig User supplied Config + * @return true if the {@link FlowTemplate} is resolvable + */ @Override - public Dag<JobTemplate> getDag() - throws IOException { - ensureTemplateMaterialized(); - return this.dag; + public boolean isResolvable(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor) + throws SpecNotFoundException, JobTemplate.TemplateException { + Config inputDescriptorConfig = inputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX); + Config outputDescriptorConfig = outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX); + userConfig = userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig); + + ConfigResolveOptions resolveOptions = ConfigResolveOptions.defaults().setAllowUnresolved(true); + + for (JobTemplate template: this.jobTemplates) { + Config templateConfig = template.getResolvedConfig(userConfig).resolve(resolveOptions); + if (!template.getResolvedConfig(userConfig).resolve(resolveOptions).isResolved()) { + return false; + } + } + return true; + } + + @Override + public List<Config> getResolvedJobConfigs(Config userConfig, DatasetDescriptor inputDescriptor, + DatasetDescriptor outputDescriptor) + throws SpecNotFoundException, JobTemplate.TemplateException { + Config inputDescriptorConfig = inputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX); + Config outputDescriptorConfig = outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX); + userConfig = userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig); + + List<Config> resolvedJobConfigs = new ArrayList<>(); + for (JobTemplate jobTemplate: getJobTemplates()) { + Config resolvedJobConfig = jobTemplate.getResolvedConfig(userConfig).resolve().withValue( + ConfigurationKeys.JOB_TEMPLATE_PATH, ConfigValueFactory.fromAnyRef(jobTemplate.getUri().toString()));; + resolvedJobConfigs.add(resolvedJobConfig); + } + return resolvedJobConfigs; } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java index 5dba91c..59c3c87 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java @@ -19,16 +19,23 @@ package org.apache.gobblin.service.modules.template_catalog; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import com.google.common.base.Charsets; import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigResolveOptions; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; @@ -41,17 +48,22 @@ import org.apache.gobblin.service.modules.template.FlowTemplate; import org.apache.gobblin.service.modules.template.HOCONInputStreamFlowTemplate; import org.apache.gobblin.util.PathUtils; + /** * An implementation of a catalog for {@link FlowTemplate}s. Provides basic API for retrieving a {@link FlowTemplate} * from the catalog and for retrieving {@link JobTemplate}s that are part of a {@link FlowTemplate}. * The flow and job configuration files are assumed to have the following path structure: - * <p> /path/to/template/catalog/flowName/flow.(conf|pull) </p> - * <p> /path/to/template/catalog/flowName/jobs/job1.(conf|pull) </p> - * <p> /path/to/template/catalog/flowName/jobs/job2.(conf|pull) </p> + * <p> /path/to/template/catalog/flowName/flow.conf </p> + * <p> /path/to/template/catalog/flowName/jobs/job1.(job|template) </p> + * <p> /path/to/template/catalog/flowName/jobs/job2.(job|template) </p> */ @Alpha public class FSFlowCatalog extends FSJobCatalog implements FlowCatalogWithTemplates { - public static final String JOB_TEMPLATE_DIR_NAME="jobs"; + public static final String JOBS_DIR_NAME = "jobs"; + public static final String FLOW_CONF_FILE_NAME = "flow.conf"; + public static final List<String> JOB_FILE_EXTENSIONS = Arrays.asList(".job", ".template"); + public static final String JOB_TEMPLATE_KEY = "gobblin.template.uri"; + protected static final String FS_SCHEME = "FS"; /** @@ -59,64 +71,94 @@ public class FSFlowCatalog extends FSJobCatalog implements FlowCatalogWithTempla * @param sysConfig that must contain the fully qualified path of the flow template catalog * @throws IOException */ - public FSFlowCatalog(Config sysConfig) throws IOException { - super(sysConfig.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, sysConfig.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY))); + public FSFlowCatalog(Config sysConfig) + throws IOException { + super(sysConfig.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, + sysConfig.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY))); } /** * - * @param flowUri URI of the flow configuration file + * @param flowTemplateDirURI URI of the flow template directory * @return a {@link FlowTemplate} * @throws SpecNotFoundException * @throws JobTemplate.TemplateException * @throws IOException */ - public FlowTemplate getFlowTemplate(URI flowUri) throws SpecNotFoundException, JobTemplate.TemplateException, IOException { + public FlowTemplate getFlowTemplate(URI flowTemplateDirURI) + throws SpecNotFoundException, JobTemplate.TemplateException, IOException, URISyntaxException { if (!this.sysConfig.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)) { throw new RuntimeException("Missing config " + ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY); } - if (!flowUri.getScheme().equals(FS_SCHEME)) { - throw new RuntimeException("Expected scheme " + FS_SCHEME + " got unsupported scheme " + flowUri.getScheme()); + if (!flowTemplateDirURI.getScheme().equals(FS_SCHEME)) { + throw new RuntimeException( + "Expected scheme " + FS_SCHEME + " got unsupported scheme " + flowTemplateDirURI.getScheme()); } String templateCatalogDir = this.sysConfig.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY); // path of uri is location of template file relative to the job configuration root directory - Path templateFullPath = PathUtils.mergePaths(new Path(templateCatalogDir), new Path(flowUri.getPath())); + Path templateDirPath = PathUtils.mergePaths(new Path(templateCatalogDir), new Path(flowTemplateDirURI.getPath())); + Path templateFullPath = PathUtils.mergePaths(templateDirPath, new Path(FLOW_CONF_FILE_NAME)); FileSystem fs = FileSystem.get(templateFullPath.toUri(), new Configuration()); try (InputStream is = fs.open(templateFullPath)) { - return new HOCONInputStreamFlowTemplate(is, flowUri, this); - } catch (ReflectiveOperationException e) { - throw new RuntimeException(e); + return new HOCONInputStreamFlowTemplate(is, flowTemplateDirURI, this); } } /** * - * @param flowTemplateDirUri URI of the flow template directory + * @param flowTemplateDirURI URI of the flow template directory * @return a list of {@link JobTemplate}s for a given flow identified by its {@link URI}. * @throws IOException * @throws SpecNotFoundException * @throws JobTemplate.TemplateException */ - public List<JobTemplate> getJobTemplatesForFlow(URI flowTemplateDirUri) - throws IOException, SpecNotFoundException, JobTemplate.TemplateException { + public List<JobTemplate> getJobTemplatesForFlow(URI flowTemplateDirURI) + throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException { + + PathFilter extensionFilter = file -> { + for (String extension : JOB_FILE_EXTENSIONS) { + if (file.getName().endsWith(extension)) { + return true; + } + } + return false; + }; + if (!this.sysConfig.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)) { throw new RuntimeException("Missing config " + ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY); } - if (!flowTemplateDirUri.getScheme().equals(FS_SCHEME)) { - throw new RuntimeException("Expected scheme " + FS_SCHEME + " got unsupported scheme " + flowTemplateDirUri.getScheme()); + if (!flowTemplateDirURI.getScheme().equals(FS_SCHEME)) { + throw new RuntimeException( + "Expected scheme " + FS_SCHEME + " got unsupported scheme " + flowTemplateDirURI.getScheme()); } List<JobTemplate> jobTemplates = new ArrayList<>(); String templateCatalogDir = this.sysConfig.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY); - Path templateDirPath = PathUtils.mergePaths(new Path(templateCatalogDir), new Path(flowTemplateDirUri)); - Path jobTemplatePath = new Path(templateDirPath, JOB_TEMPLATE_DIR_NAME); + Path templateDirPath = PathUtils.mergePaths(new Path(templateCatalogDir), new Path(flowTemplateDirURI)); + Path jobTemplatePath = new Path(templateDirPath, JOBS_DIR_NAME); FileSystem fs = FileSystem.get(jobTemplatePath.toUri(), new Configuration()); - for (FileStatus fileStatus : fs.listStatus(jobTemplatePath)) { - try (InputStream is = fs.open(fileStatus.getPath())) { - jobTemplates.add(new HOCONInputStreamJobTemplate(is, fileStatus.getPath().toUri(), this)); + + for (FileStatus fileStatus : fs.listStatus(jobTemplatePath, extensionFilter)) { + Config templateConfig = loadHoconFileAtPath(fileStatus.getPath(), true); + if (templateConfig.hasPath(JOB_TEMPLATE_KEY)) { + URI templateUri = new URI(templateConfig.getString(JOB_TEMPLATE_KEY)); + //Strip out the initial "/" + URI actualResourceUri = new URI(templateUri.getPath().substring(1)); + Path fullTemplatePath = + new Path(FSFlowCatalog.class.getClassLoader().getResource(actualResourceUri.getPath()).toURI()); + templateConfig = templateConfig.withFallback(loadHoconFileAtPath(fullTemplatePath, true)); } + jobTemplates.add(new HOCONInputStreamJobTemplate(templateConfig, fileStatus.getPath().toUri(), this)); } return jobTemplates; } + + private Config loadHoconFileAtPath(Path filePath, boolean allowUnresolved) + throws IOException { + ConfigResolveOptions options = ConfigResolveOptions.defaults().setAllowUnresolved(allowUnresolved); + try (InputStream is = fs.open(filePath)) { + return ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8)).resolve(options); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FlowCatalogWithTemplates.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FlowCatalogWithTemplates.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FlowCatalogWithTemplates.java index 41c0c9e..8bafe97 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FlowCatalogWithTemplates.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FlowCatalogWithTemplates.java @@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.template_catalog; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; import java.util.List; import org.apache.gobblin.runtime.api.JobTemplate; @@ -34,16 +35,18 @@ public interface FlowCatalogWithTemplates { * Get {@link FlowTemplate} with given {@link URI}. * @throws SpecNotFoundException if a {@link JobTemplate} with given {@link URI} cannot be found. */ - FlowTemplate getFlowTemplate(URI uri) throws SpecNotFoundException, IOException, JobTemplate.TemplateException; + FlowTemplate getFlowTemplate(URI uri) + throws SpecNotFoundException, IOException, JobTemplate.TemplateException, URISyntaxException; /** * - * @param flowUri + * @param flowTemplateDirURI URI of the flow template directory. * @return a list of {@link JobTemplate}s for a given flow identified by its {@link URI}. * @throws IOException * @throws SpecNotFoundException * @throws JobTemplate.TemplateException */ - public List<JobTemplate> getJobTemplatesForFlow(URI flowUri) throws IOException, SpecNotFoundException, JobTemplate.TemplateException; + public List<JobTemplate> getJobTemplatesForFlow(URI flowTemplateDirURI) + throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java new file mode 100644 index 0000000..b5451bc --- /dev/null +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.core; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.SystemUtils; +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.errors.GitAPIException; +import org.eclipse.jgit.dircache.DirCache; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.lib.RepositoryCache; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.transport.RefSpec; +import org.eclipse.jgit.util.FS; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph; +import org.apache.gobblin.service.modules.flowgraph.DataNode; +import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; +import org.apache.gobblin.service.modules.flowgraph.FlowEdge; +import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; + + +public class GitFlowGraphMonitorTest { + private static final Logger logger = LoggerFactory.getLogger(GitFlowGraphMonitor.class); + private Repository remoteRepo; + private Git gitForPush; + private static final String TEST_DIR = "/tmp/gitFlowGraphTestDir"; + private final File remoteDir = new File(TEST_DIR + "/remote"); + private final File cloneDir = new File(TEST_DIR + "/clone"); + private final File flowGraphDir = new File(cloneDir, "/gobblin-flowgraph"); + private static final String NODE_1_FILE = "node1.properties"; + private final File node1Dir = new File(flowGraphDir, "node1"); + private final File node1File = new File(node1Dir, NODE_1_FILE); + private static final String NODE_2_FILE = "node2.properties"; + private final File node2Dir = new File(flowGraphDir, "node2"); + private final File node2File = new File(node2Dir, NODE_2_FILE); + private final File edge1Dir = new File(node1Dir, "node2"); + private final File edge1File = new File(edge1Dir, "edge1.properties"); + + private RefSpec masterRefSpec = new RefSpec("master"); + private FSFlowCatalog flowCatalog; + private Config config; + private BaseFlowGraph flowGraph; + private GitFlowGraphMonitor gitFlowGraphMonitor; + + @BeforeClass + public void setUp() throws Exception { + cleanUpDir(TEST_DIR); + + // Create a bare repository + RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir, FS.DETECTED); + this.remoteRepo = fileKey.open(false); + this.remoteRepo.create(true); + + this.gitForPush = Git.cloneRepository().setURI(this.remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call(); + + // push an empty commit as a base for detecting changes + this.gitForPush.commit().setMessage("First commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + + this.config = ConfigBuilder.create() + .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + + ConfigurationKeys.GIT_MONITOR_REPO_URI, this.remoteRepo.getDirectory().getAbsolutePath()) + .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/git-flowgraph") + .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5) + .build(); + + // Create a FSFlowCatalog instance + URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI(); + Properties properties = new Properties(); + properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString()); + Config config = ConfigFactory.parseProperties(properties); + Config templateCatalogCfg = config + .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, + config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)); + this.flowCatalog = new FSFlowCatalog(templateCatalogCfg); + + //Create a FlowGraph instance with defaults + this.flowGraph = new BaseFlowGraph(); + + this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph); + this.gitFlowGraphMonitor.setActive(true); + } + + private void testAddNodeHelper(File nodeDir, File nodeFile, String nodeId, String paramValue) + throws IOException, GitAPIException { + // push a new node file + nodeDir.mkdirs(); + nodeFile.createNewFile(); + Files.write(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=" + paramValue + "\n", nodeFile, Charsets.UTF_8); + + // add, commit, push node + this.gitForPush.add().addFilepattern(formNodeFilePath(nodeDir.getName(), nodeFile.getName())).call(); + this.gitForPush.commit().setMessage("Node commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + + this.gitFlowGraphMonitor.processGitConfigChanges(); + + //Check if node1 has been added to the FlowGraph + DataNode dataNode = this.flowGraph.getNode(nodeId); + Assert.assertEquals(dataNode.getId(), nodeId); + Assert.assertTrue(dataNode.isActive()); + Assert.assertEquals(dataNode.getRawConfig().getString("param1"), paramValue); + } + + @Test + public void testAddNode() + throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException { + testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value1"); + testAddNodeHelper(this.node2Dir, this.node2File, "node2", "value2"); + } + + @Test (dependsOnMethods = "testAddNode") + public void testAddEdge() + throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException { + // push a new node file + this.edge1Dir.mkdirs(); + this.edge1File.createNewFile(); + + Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY + "=FS:///flowEdgeTemplate\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0." + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1." + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n", edge1File, Charsets.UTF_8); + + // add, commit, push + this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call(); + this.gitForPush.commit().setMessage("Edge commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + + this.gitFlowGraphMonitor.processGitConfigChanges(); + + //Check if edge1 has been added to the FlowGraph + Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1"); + Assert.assertEquals(edgeSet.size(), 1); + FlowEdge flowEdge = edgeSet.iterator().next(); + Assert.assertEquals(flowEdge.getSrc(), "node1"); + Assert.assertEquals(flowEdge.getDest(), "node2"); + Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1"); + Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1"); + Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor"); + Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2"); + Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2"); + Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor"); + } + + @Test (dependsOnMethods = "testAddNode") + public void testUpdateEdge() + throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException { + //Update edge1 file + Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY + "=FS:///flowEdgeTemplate\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0." + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1." + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n" + + "key1=value1\n", edge1File, Charsets.UTF_8); + + // add, commit, push + this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call(); + this.gitForPush.commit().setMessage("Edge commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + + this.gitFlowGraphMonitor.processGitConfigChanges(); + + //Check if new edge1 has been added to the FlowGraph + Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1"); + Assert.assertEquals(edgeSet.size(), 1); + FlowEdge flowEdge = edgeSet.iterator().next(); + Assert.assertEquals(flowEdge.getSrc(), "node1"); + Assert.assertEquals(flowEdge.getDest(), "node2"); + Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1"); + Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1"); + Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor"); + Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2"); + Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2"); + Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor"); + Assert.assertEquals(flowEdge.getConfig().getString("key1"), "value1"); + } + + @Test (dependsOnMethods = "testUpdateEdge") + public void testUpdateNode() + throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException { + //Update param1 value in node1 and check if updated node is added to the graph + testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value3"); + } + + + @Test (dependsOnMethods = "testUpdateNode") + public void testRemoveEdge() throws GitAPIException, IOException { + // delete a config file + edge1File.delete(); + + //Node1 has 1 edge before delete + Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1"); + Assert.assertEquals(edgeSet.size(), 1); + + // delete, commit, push + DirCache ac = this.gitForPush.rm().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), + this.edge1Dir.getName(), this.edge1File.getName())).call(); + RevCommit cc = this.gitForPush.commit().setMessage("Edge remove commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + + this.gitFlowGraphMonitor.processGitConfigChanges(); + + //Check if edge1 has been deleted from the graph + edgeSet = this.flowGraph.getEdges("node1"); + Assert.assertTrue(edgeSet.size() == 0); + } + + @Test (dependsOnMethods = "testRemoveEdge") + public void testRemoveNode() throws GitAPIException, IOException { + //delete node file + node1File.delete(); + + //node1 is present in the graph before delete + DataNode node1 = this.flowGraph.getNode("node1"); + Assert.assertNotNull(node1); + + // delete, commit, push + DirCache ac = this.gitForPush.rm().addFilepattern(formNodeFilePath(this.node1Dir.getName(), this.node1File.getName())).call(); + RevCommit cc = this.gitForPush.commit().setMessage("Node remove commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + + this.gitFlowGraphMonitor.processGitConfigChanges(); + + //Check if node1 has been deleted from the graph + node1 = this.flowGraph.getNode("node1"); + Assert.assertNull(node1); + } + + + private void cleanUpDir(String dir) { + File specStoreDir = new File(dir); + + // cleanup is flaky on Travis, so retry a few times and then suppress the error if unsuccessful + for (int i = 0; i < 5; i++) { + try { + if (specStoreDir.exists()) { + FileUtils.deleteDirectory(specStoreDir); + } + // if delete succeeded then break out of loop + break; + } catch (IOException e) { + logger.warn("Cleanup delete directory failed for directory: " + dir, e); + } + } + } + + private String formNodeFilePath(String groupDir, String fileName) { + return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName; + } + + private String formEdgeFilePath(String parentDir, String groupDir, String fileName) { + return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + parentDir + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName; + } + + @AfterClass + public void tearDown() throws Exception { + cleanUpDir(TEST_DIR); + } +} \ No newline at end of file
