[GOBBLIN-491] Create a FlowGraph representation for multi-hop support in Gobblin-as-a-Service.[]
Closes #2361 from sv2000/flowGraph Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/9b91fa1b Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/9b91fa1b Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/9b91fa1b Branch: refs/heads/master Commit: 9b91fa1b3bc7ec62aa3a4192dffd3fef63dfea94 Parents: 8949aa3 Author: suvasude <[email protected]> Authored: Mon Jun 4 21:05:50 2018 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Mon Jun 4 21:05:50 2018 -0700 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 1 + .../apache/gobblin/runtime/api/JobTemplate.java | 6 + .../runtime/template/StaticJobTemplate.java | 19 +- ...ackagedTemplatesJobCatalogDecoratorTest.java | 3 + .../template/InheritingJobTemplateTest.java | 6 + .../dataset/BaseHdfsDatasetDescriptor.java | 99 ++++++++ .../modules/dataset/DatasetDescriptor.java | 44 ++++ .../modules/dataset/HdfsDatasetDescriptor.java | 40 ++++ .../service/modules/flowgraph/BaseDataNode.java | 81 +++++++ .../service/modules/flowgraph/BaseFlowEdge.java | 191 +++++++++++++++ .../modules/flowgraph/BaseFlowGraph.java | 189 +++++++++++++++ .../gobblin/service/modules/flowgraph/Dag.java | 153 ++++++++++++ .../service/modules/flowgraph/DataNode.java | 54 +++++ .../flowgraph/DatasetDescriptorConfigKeys.java | 28 +++ .../modules/flowgraph/FileSystemDataNode.java | 83 +++++++ .../service/modules/flowgraph/FlowEdge.java | 85 +++++++ .../modules/flowgraph/FlowEdgeFactory.java | 53 +++++ .../service/modules/flowgraph/FlowGraph.java | 76 ++++++ .../flowgraph/FlowGraphConfigurationKeys.java | 40 ++++ .../service/modules/flowgraph/HdfsDataNode.java | 59 +++++ .../modules/flowgraph/LocalFSDataNode.java | 51 ++++ .../service/modules/template/FlowTemplate.java | 62 +++++ .../template/HOCONInputStreamFlowTemplate.java | 56 +++++ .../modules/template/JobTemplateDagFactory.java | 79 +++++++ .../modules/template/StaticFlowTemplate.java | 154 +++++++++++++ .../modules/template_catalog/FSFlowCatalog.java | 122 ++++++++++ .../FlowCatalogWithTemplates.java | 49 ++++ .../modules/flowgraph/BaseFlowGraphTest.java | 230 +++++++++++++++++++ .../service/modules/flowgraph/DagTest.java | 133 +++++++++++ .../template/JobTemplateDagFactoryTest.java | 92 ++++++++ .../template_catalog/FSFlowCatalogTest.java | 107 +++++++++ .../flowgraph/BaseFlowEdgeFactoryTest.java | 73 ++++++ .../template_catalog/test-template/flow.conf | 16 ++ .../test-template/jobs/job1.conf | 2 + .../test-template/jobs/job2.conf | 3 + .../test-template/jobs/job3.conf | 3 + .../test-template/jobs/job4.conf | 3 + 37 files changed, 2537 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 2291d72..b6196da 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -214,6 +214,7 @@ public class ConfigurationKeys { * Configuration property used only for job configuration file's tempalte, inside .template file */ public static final String REQUIRED_ATRRIBUTES_LIST = "gobblin.template.required_attributes"; + public static final String JOB_DEPENDENCIES="dependencies"; /** * Configuration for emitting job events http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java index 1269bc4..f8f3071 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java @@ -47,6 +47,12 @@ public interface JobTemplate extends Spec { Collection<String> getRequiredConfigList() throws SpecNotFoundException, TemplateException; /** + * Retrieve all job names that this job depends on. Useful for building a dag of + * JobTemplates. + */ + Collection<String> getDependencies(); + + /** * Return the combine configuration of template and user customized attributes. * @return */ http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java index c370c62..6c69370 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java @@ -19,21 +19,18 @@ package org.apache.gobblin.runtime.template; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; -import java.util.Properties; import java.util.Set; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.runtime.api.GobblinInstanceDriver; -import org.apache.gobblin.runtime.api.JobCatalog; import org.apache.gobblin.runtime.api.JobCatalogWithTemplates; import org.apache.gobblin.runtime.api.JobTemplate; import org.apache.gobblin.runtime.api.SpecNotFoundException; @@ -56,6 +53,8 @@ public class StaticJobTemplate extends InheritingJobTemplate { private final String version; @Getter private final String description; + @Getter + private Collection<String> dependencies; public StaticJobTemplate(URI uri, String version, String description, Config config, JobCatalogWithTemplates catalog) throws SpecNotFoundException, TemplateException { @@ -63,18 +62,22 @@ public class StaticJobTemplate extends InheritingJobTemplate { } protected StaticJobTemplate(URI uri, String version, String description, Config config, List<URI> superTemplateUris, - JobCatalogWithTemplates catalog) throws SpecNotFoundException, TemplateException { + JobCatalogWithTemplates catalog) + throws SpecNotFoundException, TemplateException { super(superTemplateUris, catalog); this.uri = uri; this.version = version; this.description = description; this.rawConfig = config; - this.requiredAttributes = config.hasPath(ConfigurationKeys.REQUIRED_ATRRIBUTES_LIST) - ? new HashSet<>(Arrays.asList(config.getString(ConfigurationKeys.REQUIRED_ATRRIBUTES_LIST).split(","))) + this.requiredAttributes = config.hasPath(ConfigurationKeys.REQUIRED_ATRRIBUTES_LIST) ? new HashSet<>( + Arrays.asList(config.getString(ConfigurationKeys.REQUIRED_ATRRIBUTES_LIST).split(","))) : Sets.<String>newHashSet(); + this.dependencies = config.hasPath(ConfigurationKeys.JOB_DEPENDENCIES) ? Arrays + .asList(config.getString(ConfigurationKeys.JOB_DEPENDENCIES).split(",")) : new ArrayList<>(); } - private static List<URI> getSuperTemplateUris(Config config) throws TemplateException { + private static List<URI> getSuperTemplateUris(Config config) + throws TemplateException { if (config.hasPath(SUPER_TEMPLATE_KEY)) { List<URI> uris = Lists.newArrayList(); for (String uriString : config.getString(SUPER_TEMPLATE_KEY).split(",")) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/PackagedTemplatesJobCatalogDecoratorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/PackagedTemplatesJobCatalogDecoratorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/PackagedTemplatesJobCatalogDecoratorTest.java index bf448cb..94f02ae 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/PackagedTemplatesJobCatalogDecoratorTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/PackagedTemplatesJobCatalogDecoratorTest.java @@ -98,6 +98,9 @@ public class PackagedTemplatesJobCatalogDecoratorTest { public Config getResolvedConfig(Config userConfig) { return null; } + + @Override + public Collection<String> getDependencies() { return null; } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/InheritingJobTemplateTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/InheritingJobTemplateTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/InheritingJobTemplateTest.java index ab76f18..c8d4183 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/InheritingJobTemplateTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/InheritingJobTemplateTest.java @@ -177,6 +177,7 @@ public class InheritingJobTemplateTest { private final URI uri; private final Map<String,String> rawTemplate; private final List<String> required; + private Collection<String> dependencies; public TestTemplate(URI uri, List<URI> superTemplateUris, Map<String, String> rawTemplate, List<String> required, JobCatalogWithTemplates catalog) throws SpecNotFoundException, TemplateException { @@ -227,6 +228,11 @@ public class InheritingJobTemplateTest { } return userConfig.withFallback(getLocalRawTemplate()); } + + @Override + public Collection<String> getDependencies() { + return this.dependencies; + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java new file mode 100644 index 0000000..f7cf99f --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.dataset; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys; +import org.apache.gobblin.util.ConfigUtils; + +import lombok.Getter; + + +/** + * An implementation of {@link HdfsDatasetDescriptor}. + */ +@Alpha +public class BaseHdfsDatasetDescriptor implements HdfsDatasetDescriptor { + public static final String HDFS_PLATFORM_NAME = "hdfs"; + + @Getter + private final String path; + @Getter + private final String format; + @Getter + private final String description; + @Getter + private final String platform = HDFS_PLATFORM_NAME; + + public BaseHdfsDatasetDescriptor(Config config) { + Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PATH_KEY), String.format("Missing required property %s", DatasetDescriptorConfigKeys.PATH_KEY)); + Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.FORMAT_KEY), String.format("Missing required property %s", DatasetDescriptorConfigKeys.FORMAT_KEY)); + + this.path = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY, null); + this.format = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.FORMAT_KEY, null); + this.description = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.DESCRIPTION_KEY, ""); + } + + /** + * A {@link HdfsDatasetDescriptor} is compatible with another {@link DatasetDescriptor} iff they have identical + * platform, type, path, and format. + * TODO: Currently isCompatibleWith() only checks if HDFS paths described by the two {@link DatasetDescriptor}s + * being compared are identical. Need to enhance this for the case of where paths can contain glob patterns. + * e.g. paths described by the pattern /data/input/* are a subset of paths described by /data/* and hence, the + * two descriptors should be compatible. + * @return true if this {@link HdfsDatasetDescriptor} is compatibe with another {@link DatasetDescriptor}. + */ + @Override + public boolean isCompatibleWith(DatasetDescriptor o) { + return this.equals(o); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HdfsDatasetDescriptor other = (HdfsDatasetDescriptor) o; + if(this.getPlatform() == null || other.getPlatform() == null) { + return false; + } + if(!this.getPlatform().equalsIgnoreCase(other.getPlatform()) || !(o instanceof HdfsDatasetDescriptor)) { + return false; + } + + return this.getPath().equals(other.getPath()) && this.getFormat().equalsIgnoreCase(other.getFormat()); + } + + @Override + public String toString() { + return "(" + Joiner.on(",").join(this.getPlatform(),this.getPath(),this.getFormat()) + ")"; + } + + @Override + public int hashCode() { + return this.toString().hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java new file mode 100644 index 0000000..4a322e6 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.dataset; + +import org.apache.gobblin.annotation.Alpha; + + +/** + * The interface for dataset descriptors. + */ +@Alpha +public interface DatasetDescriptor { + /** + * @return the dataset platform i.e. the storage backing the dataset (e.g. HDFS, JDBC, Espresso etc.) + */ + public String getPlatform(); + + /** + * @return a human-readable description of the dataset. + */ + public String getDescription(); + + /** + * @return true if this {@link DatasetDescriptor} is compatible with the other {@link DatasetDescriptor} i.e. the + * datasets described by this {@link DatasetDescriptor} is a subset of the datasets described by the other {@link DatasetDescriptor}. + * This check is non-commutative. + */ + public boolean isCompatibleWith(DatasetDescriptor other); +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java new file mode 100644 index 0000000..6f1970c --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.dataset; + +import org.apache.gobblin.annotation.Alpha; + + +/** + * A descriptor interface for HDFS datasets + */ +@Alpha +public interface HdfsDatasetDescriptor extends DatasetDescriptor { + /** + * + * @return dataset path. + */ + public String getPath(); + + /** + * + * @return storage format of the dataset. + */ + public String getFormat(); + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java new file mode 100644 index 0000000..731bc22 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flowgraph; + +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.service.modules.dataset.DatasetDescriptor; +import org.apache.gobblin.util.ConfigUtils; + +import joptsimple.internal.Strings; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + + +/** + * An implementation of {@link DataNode}. + */ +@Alpha +@Slf4j +public class BaseDataNode implements DataNode { + @Getter + private String id; + @Getter + private Config props; + @Getter + private boolean active = true; + + public BaseDataNode(Config nodeProps) throws DataNodeCreationException { + try { + String nodeId = ConfigUtils.getString(nodeProps, FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, ""); + Preconditions.checkArgument(!Strings.isNullOrEmpty(nodeId), "Node Id cannot be null or empty"); + this.id = nodeId; + if (nodeProps.hasPath(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY)) { + this.active = nodeProps.getBoolean(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY); + } + this.props = nodeProps; + } catch(Exception e) { + throw new DataNodeCreationException(e); + } + } + + /** + * The comparison between two nodes should involve the configuration. + * Node name is the identifier for the node. + * */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + BaseDataNode that = (BaseDataNode) o; + + return id.equals(that.getId()); + } + + @Override + public int hashCode() { + return this.id.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java new file mode 100644 index 0000000..ccce62e --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flowgraph; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.security.UserGroupInformation; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; +import org.apache.gobblin.service.modules.template.FlowTemplate; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; +import org.apache.gobblin.util.ConfigUtils; + +import joptsimple.internal.Strings; +import lombok.Getter; + + +/** + * An implementation of {@link FlowEdge}. + */ +@Alpha +public class BaseFlowEdge implements FlowEdge { + public static final String FLOW_EDGE_LABEL_JOINER_CHAR = ":"; + + @Getter + protected List<String> endPoints; + + @Getter + protected FlowTemplate flowTemplate; + + @Getter + private List<SpecExecutor> executors; + + @Getter + private Config props; + + @Getter + private String id; + + @Getter + private boolean active; + + //Constructor + public BaseFlowEdge(List<String> endPoints, String edgeName, FlowTemplate flowTemplate, List<SpecExecutor> executors, Config properties, boolean active) { + this.endPoints = endPoints; + this.flowTemplate = flowTemplate; + this.executors = executors; + this.active = active; + this.props = properties; + this.id = generateEdgeId(endPoints, edgeName); + } + + @Override + public boolean isAccessible(UserGroupInformation user) { + return true; + } + + @VisibleForTesting + protected static String generateEdgeId(List<String> endPoints, String edgeName) { + return Joiner.on(FLOW_EDGE_LABEL_JOINER_CHAR).join(endPoints.get(0), endPoints.get(1), edgeName); + } + /** + * The {@link FlowEdge}s are the same if they have the same endpoints and both refer to the same {@FlowTemplate} i.e. + * the {@link FlowTemplate} uris are the same + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FlowEdge that = (FlowEdge) o; + + if(!(this.getEndPoints().get(0).equals(that.getEndPoints().get(0))) && ((this.getEndPoints().get(1)).equals(that.getEndPoints().get(1)))) { + return false; + } + + if(!this.getFlowTemplate().getUri().equals(that.getFlowTemplate().getUri())) { + return false; + } + return true; + } + + @Override + public int hashCode() { + return this.id.hashCode(); + } + + @Override + public String toString() { + return this.id; + } + + /** + * A {@link FlowEdgeFactory} for creating {@link BaseFlowEdge}. + */ + public static class Factory implements FlowEdgeFactory { + + /** + * A method to return an instance of {@link BaseFlowEdge}. The method performs all the validation checks + * and returns + * @param edgeProps Properties of edge + * @param flowCatalog Flow Catalog used to retrieve {@link FlowTemplate}s. + * @return a {@link BaseFlowEdge} + */ + @Override + public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog flowCatalog) throws FlowEdgeCreationException { + try { + String source = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,""); + Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty source"); + String destination = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY,""); + Preconditions.checkArgument(!Strings.isNullOrEmpty(destination), "A FlowEdge must have a non-null or empty destination"); + List<String> endPoints = Lists.newArrayList(source, destination); + String edgeName = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY,""); + Preconditions.checkArgument(!Strings.isNullOrEmpty(edgeName), "A FlowEdge must have a non-null or empty name"); + + List<Config> specExecutorConfigList = new ArrayList<>(); + boolean flag; + for(int i = 0; (flag = edgeProps.hasPath(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i)) != false; i++) { + specExecutorConfigList.add(edgeProps.getConfig(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i)); + } + + String flowTemplateUri = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY, ""); + + //Perform basic validation + Preconditions.checkArgument(endPoints.size() == 2, "A FlowEdge must have 2 end points"); + Preconditions + .checkArgument(specExecutorConfigList.size() > 0, "A FlowEdge must have at least one SpecExecutor"); + Preconditions + .checkArgument(!Strings.isNullOrEmpty(flowTemplateUri), "FlowTemplate URI must be not null or empty"); + boolean isActive = ConfigUtils.getBoolean(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY, true); + + //Build SpecExecutor from config + List<SpecExecutor> specExecutors = new ArrayList<>(); + + for (Config specExecutorConfig : specExecutorConfigList) { + Class executorClass = Class.forName(specExecutorConfig.getString(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY)); + SpecExecutor executor = (SpecExecutor) GobblinConstructorUtils.invokeLongestConstructor(executorClass, specExecutorConfig); + specExecutors.add(executor); + } + FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateUri)); + return new BaseFlowEdge(endPoints, edgeName, flowTemplate, specExecutors, edgeProps, isActive); + } catch (Exception e) { + throw new FlowEdgeCreationException(e); + } + } + + @Override + public String getEdgeId(Config edgeProps) throws IOException { + String source = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,""); + Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty source"); + String destination = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY,""); + Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty destination"); + String edgeName = + ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY, ""); + Preconditions.checkArgument(!Strings.isNullOrEmpty(edgeName), "A FlowEdge must have a non-null or empty name"); + List<String> endPoints = Lists.newArrayList(source, destination); + + return generateEdgeId(endPoints, edgeName); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java new file mode 100644 index 0000000..783f7ea --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flowgraph; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.gobblin.annotation.Alpha; + +import lombok.extern.slf4j.Slf4j; + + +/** + * A thread-safe implementation of {@link FlowGraph}. The implementation maintains the following data structures: + * <p>dataNodeMap - the mapping from a node identifier to the {@link DataNode} instance</p> + * <p>nodesToEdges - the mapping from each {@link DataNode} to its outgoing {@link FlowEdge}s</p> + * <p>flowEdgeMap - the mapping from a edge label to the {@link FlowEdge} instance</p> + */ +@Alpha +@Slf4j +public class BaseFlowGraph implements FlowGraph { + private Map<DataNode, Set<FlowEdge>> nodesToEdges = new HashMap<>(); + private Map<String, DataNode> dataNodeMap = new HashMap<>(); + private Map<String, FlowEdge> flowEdgeMap = new HashMap<>(); + + /** + * Lookup a node by its identifier. + * + * @param nodeId node identifier + * @return {@link DataNode} with nodeId as the identifier. + */ + public DataNode getNode(String nodeId) { + return this.dataNodeMap.getOrDefault(nodeId, null); + } + + /** + * Add a {@link DataNode} to the {@link FlowGraph}. If the node already "exists" in the {@link FlowGraph} (i.e. the + * FlowGraph already has another node with the same id), we remove the old node and add the new one. The + * edges incident on the old node are preserved. + * @param node to be added to the {@link FlowGraph} + * @return true if node is successfully added to the {@link FlowGraph}. + */ + @Override + public synchronized boolean addDataNode(DataNode node) { + //Get edges adjacent to the node if it already exists + Set<FlowEdge> edges = this.nodesToEdges.getOrDefault(node, new HashSet<>()); + this.nodesToEdges.put(node, edges); + this.dataNodeMap.put(node.getId(), node); + return true; + } + + /** + * Add a {@link FlowEdge} to the {@link FlowGraph}. Addition of edge succeeds only if both the end points of the + * edge are already nodes in the FlowGraph. If a {@link FlowEdge} already exists, the old FlowEdge is removed and + * the new one added in its place. + * @param edge + * @return true if addition of {@FlowEdge} is successful. + */ + @Override + public synchronized boolean addFlowEdge(FlowEdge edge) { + String srcNode = edge.getEndPoints().get(0); + String dstNode = edge.getEndPoints().get(1); + if(!dataNodeMap.containsKey(srcNode) || !dataNodeMap.containsKey(dstNode)) { + return false; + } + DataNode dataNode = getNode(srcNode); + if(dataNode != null) { + Set<FlowEdge> adjacentEdges = this.nodesToEdges.get(dataNode); + if(!adjacentEdges.add(edge)) { + adjacentEdges.remove(edge); + adjacentEdges.add(edge); + } + this.nodesToEdges.put(dataNode, adjacentEdges); + String edgeId = edge.getId(); + this.flowEdgeMap.put(edgeId, edge); + return true; + } else { + return false; + } + } + + /** + * Delete a {@link DataNode} by its identifier + * @param nodeId identifier of the {@link DataNode} to be deleted. + * @return true if {@link DataNode} is successfully deleted. + */ + @Override + public synchronized boolean deleteDataNode(String nodeId) { + if(this.dataNodeMap.containsKey(nodeId) && deleteDataNode(this.dataNodeMap.get(nodeId))) { + return true; + } else { + return false; + } + } + + /** + * Delete a {@DataNode} from the {@link FlowGraph}. + * @param node to be deleted. + * @return true if {@link DataNode} is successfully deleted. + */ + public synchronized boolean deleteDataNode(DataNode node) { + if(dataNodeMap.containsKey(node.getId())) { + //Delete node from dataNodeMap + dataNodeMap.remove(node.getId()); + + //Delete all the edges adjacent to the node. First, delete edges from flowEdgeMap and next, remove the edges + // from nodesToEdges + for(FlowEdge edge: nodesToEdges.get(node)) { + flowEdgeMap.remove(edge.getId()); + } + nodesToEdges.remove(node); + return true; + } else { + return false; + } + } + + /** + * Delete a {@link DataNode} by its identifier + * @param edgeId identifier of the {@link FlowEdge} to be deleted. + * @return true if {@link FlowEdge} is successfully deleted. + */ + @Override + public synchronized boolean deleteFlowEdge(String edgeId) { + if(flowEdgeMap.containsKey(edgeId) && deleteFlowEdge(flowEdgeMap.get(edgeId))) { + return true; + } else { + return false; + } + } + + /** + * Delete a {@FlowEdge} from the {@link FlowGraph}. + * @param edge to be deleted. + * @return true if {@link FlowEdge} is successfully deleted. If the source of a {@link FlowEdge} does not exist or + * if the {@link FlowEdge} is not in the graph, return false. + */ + public synchronized boolean deleteFlowEdge(FlowEdge edge) { + if(!dataNodeMap.containsKey(edge.getEndPoints().get(0))) { + return false; + } + DataNode node = dataNodeMap.get(edge.getEndPoints().get(0)); + if(!nodesToEdges.get(node).contains(edge)) { + return false; + } + this.nodesToEdges.get(node).remove(edge); + this.flowEdgeMap.remove(edge.getId()); + return true; + } + + /** + * Get the set of edges adjacent to a {@link DataNode} + * @param nodeId identifier of the node + * @return Set of {@link FlowEdge}s adjacent to the node. + */ + @Override + public Set<FlowEdge> getEdges(String nodeId) { + DataNode dataNode = this.dataNodeMap.getOrDefault(nodeId, null); + return getEdges(dataNode); + } + + /** + * Get the set of edges adjacent to a {@link DataNode} + * @param node {@link DataNode} + * @return Set of {@link FlowEdge}s adjacent to the node. + */ + @Override + public Set<FlowEdge> getEdges(DataNode node) { + return (node != null)? this.nodesToEdges.getOrDefault(node, null) : null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java new file mode 100644 index 0000000..8ae0027 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flowgraph; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Lists; + +import org.apache.gobblin.annotation.Alpha; + +import lombok.Getter; + + +/** + * An implementation of Dag. Assumes that nodes have unique values. Nodes with duplicate values will produce + * unpredictable behavior. + */ +@Alpha +@Getter +public class Dag<T> { + private List<DagNode<T>> startNodes; + private List<DagNode<T>> endNodes; + // Map to maintain parent to children mapping. + private Map<DagNode, List<DagNode<T>>> parentChildMap; + private List<DagNode<T>> nodes; + + public Dag(List<DagNode<T>> dagNodes) { + this.nodes = dagNodes; + //Build dag + this.build(); + } + + /** + * Constructs the dag from the Node list. + */ + private void build() { + this.startNodes = new ArrayList<>(); + this.endNodes = new ArrayList<>(); + this.parentChildMap = new HashMap<>(); + for (DagNode node : this.nodes) { + //If a Node has no parent Node, add it to the list of start Nodes + if (node.getParentNodes() == null) { + this.startNodes.add(node); + } else { + List<DagNode> parentNodeList = node.getParentNodes(); + for (DagNode parentNode : parentNodeList) { + if (parentChildMap.containsKey(parentNode)) { + parentChildMap.get(parentNode).add(node); + } else { + parentChildMap.put(parentNode, Lists.newArrayList(node)); + } + } + } + } + //Iterate over all the Nodes and add a Node to the list of endNodes if it is not present in the parentChildMap + for (DagNode node : this.nodes) { + if (!parentChildMap.containsKey(node)) { + this.endNodes.add(node); + } + } + } + + public List<DagNode<T>> getChildren(DagNode node) { + return parentChildMap.getOrDefault(node, null); + } + + public List<DagNode<T>> getParents(DagNode node) { + return node.parentNodes; + } + + /** + * Concatenate two dags together. Join the "other" dag to "this" dag and return "this" dag. + * The concatenate method ensures that all the jobs of "this" dag (which may have multiple end nodes) + * are completed before starting any job of the "other" dag. This is done by adding each endNode of this dag as + * a parent of every startNode of the other dag. + * + * @param other dag to concatenate to this dag + * @return the concatenated dag + */ + public Dag<T> concatenate(Dag<T> other) throws IOException { + if (other == null) { + return this; + } + for (DagNode node : this.endNodes) { + this.parentChildMap.put(node, Lists.newArrayList()); + for (DagNode otherNode : other.startNodes) { + this.parentChildMap.get(node).add(otherNode); + otherNode.addParentNode(node); + } + this.endNodes = other.endNodes; + } + return this; + } + + @Getter + public static class DagNode<T> { + private T value; + //List of parent Nodes that are dependencies of this Node. + private List<DagNode<T>> parentNodes; + + //Constructor + public DagNode(T value) { + this.value = value; + } + + public void addParentNode(DagNode<T> node) { + if (parentNodes == null) { + parentNodes = Lists.newArrayList(node); + return; + } + parentNodes.add(node); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DagNode that = (DagNode) o; + if (!this.getValue().equals(that.getValue())) { + return false; + } + return true; + } + + @Override + public int hashCode() { + return this.getValue().hashCode(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java new file mode 100644 index 0000000..4931685 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flowgraph; + +import com.typesafe.config.Config; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.service.modules.dataset.DatasetDescriptor; + + +/** + * Representation of a node in the FlowGraph. Each node is identified by a unique identifier. + */ +@Alpha +public interface DataNode { + /** + * @return the identifier of a {@link DataNode}. + */ + String getId(); + + /** + * @return the attributes of a {@link DataNode}. It also includes properties for resolving a {@link org.apache.gobblin.runtime.api.JobTemplate} + * e.g. "source.fs.uri" for an HDFS node, "jdbc.publisher.url" for JDBC node. + */ + Config getProps(); + + /** + * @return true if the {@link DataNode} is active + */ + boolean isActive(); + + class DataNodeCreationException extends Exception { + private static final String MESSAGE_FORMAT = "Failed to create DataNode because of: %s"; + + public DataNodeCreationException(Exception e) { + super(String.format(MESSAGE_FORMAT, e.getMessage()), e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java new file mode 100644 index 0000000..e98337d --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flowgraph; + +/** + * Config keys related to {@link org.apache.gobblin.service.modules.dataset.DatasetDescriptor}. + */ +public class DatasetDescriptorConfigKeys { + //Gobblin Service Dataset Descriptor related keys + public static final String PATH_KEY = "path"; + public static final String FORMAT_KEY = "format"; + public static final String DESCRIPTION_KEY = "description"; +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java new file mode 100644 index 0000000..5899645 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flowgraph; + +import java.io.IOException; +import java.net.URI; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.util.ConfigUtils; + +import joptsimple.internal.Strings; +import lombok.Getter; + + +/** + * An abstract {@link FileSystemDataNode} implementation. In addition to the required properties of a {@link BaseDataNode}, an {@link FileSystemDataNode} + * must have a FS URI specified. Example implementations of {@link FileSystemDataNode} include {@link HdfsDataNode}, {@link LocalFSDataNode}. + */ +@Alpha +public abstract class FileSystemDataNode extends BaseDataNode { + public static final String FS_URI_KEY = "fs.uri"; + @Getter + private String fsUri; + + /** + * Constructor. An HDFS DataNode must have fs.uri property specified in addition to a node Id. + */ + public FileSystemDataNode(Config nodeProps) throws DataNodeCreationException { + super(nodeProps); + try { + this.fsUri = ConfigUtils.getString(nodeProps, FS_URI_KEY, ""); + Preconditions.checkArgument(!Strings.isNullOrEmpty(this.fsUri), "FS URI cannot be null or empty for an HDFSDataNode"); + URI uri = new URI(this.fsUri); + if(!isUriValid(uri)) { + throw new IOException("Invalid FS URI " + this.fsUri); + } + } catch(Exception e) { + throw new DataNodeCreationException(e); + } + } + + public abstract boolean isUriValid(URI fsUri); + /** + * Two HDFS DataNodes are the same if they have the same id and the same fsUri. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FileSystemDataNode that = (FileSystemDataNode) o; + + return this.getId().equals(that.getId()) && fsUri.equals(that.getFsUri()); + } + + @Override + public int hashCode() { + return Joiner.on("-").join(this.getId(), this.fsUri).hashCode(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java new file mode 100644 index 0000000..fb60d67 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flowgraph; + +import java.util.List; + +import com.typesafe.config.Config; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.service.modules.template.FlowTemplate; + + +/** + * Representation of an edge in a FlowGraph. Each {@link FlowEdge} encapsulates: + * <p><ul> + * <li> two {@link DataNode}s as its end points + * <li>a {@FlowTemplate} that responsible for data movement between the {@DataNode}s. + * <li> a list of {@link SpecExecutor}s where the {@link FlowTemplate} can be executed. + * </ul></p> and + * + */ +@Alpha +public interface FlowEdge { + /** + * + * @return the {@link DataNode} ids that are the end points of the edge. + */ + List<String> getEndPoints(); + + /** + * + * @return the {@link FlowTemplate} that performs the data movement along the edge. + */ + FlowTemplate getFlowTemplate(); + + /** + * + * @return a list of {@link SpecExecutor}s that can execute the {@link FlowTemplate} corresponding to this edge. + */ + List<SpecExecutor> getExecutors(); + + /** + * Get the properties that defines the {@link FlowEdge}. Encapsulates all the properties from which the {@link FlowEdge} + * is instantiated. It also includes properties needed for resolving a {@link org.apache.gobblin.runtime.api.JobTemplate}. + * @return the properties of this edge as a {@link Config} object. + */ + Config getProps(); + + /** + * A string uniquely identifying the edge. + * @return the label of the {@link FlowEdge}. + */ + String getId(); + + /** + * + * @return true if the {@link FlowEdge} is active. + */ + boolean isActive(); + + /** + * + * @param user + * @return true if the user has ACL permissions to access the {@link FlowEdge}, + */ + boolean isAccessible(UserGroupInformation user); +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java new file mode 100644 index 0000000..851e887 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flowgraph; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; + +import com.typesafe.config.Config; + + +public interface FlowEdgeFactory { + /** + * Construct a {@link FlowEdge} from the edge properties + * @param edgeProps properties of the {@link FlowEdge} + * @param catalog an instance of {@link FSFlowCatalog} that returns {@link org.apache.gobblin.service.modules.template.FlowTemplate}s + * useful for creating a {@link FlowEdge}. + * @return an instance of {@link FlowEdge} + * @throws FlowEdgeCreationException + */ + public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog catalog) throws FlowEdgeCreationException; + + /** + * Get an edge label from the edge properties + * @param edgeProps properties of the edge + * @return a string label identifying the edge + */ + public String getEdgeId(Config edgeProps) throws IOException; + + public class FlowEdgeCreationException extends Exception { + private static final String MESSAGE_FORMAT = "Failed to create FlowEdge because of: %s"; + + public FlowEdgeCreationException(Exception e) { + super(String.format(MESSAGE_FORMAT, e.getMessage()), e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java new file mode 100644 index 0000000..23f5793 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flowgraph; + +import java.util.Collection; + +import org.apache.gobblin.annotation.Alpha; + + +/** + * An interface for {@link FlowGraph}. A {@link FlowGraph} consists of {@link DataNode}s and {@link FlowEdge}s. + * The interface provides methods for adding and removing {@link DataNode}s and {@link FlowEdge}s to the {@link FlowGraph}. + * In addition the interface provides methods to return factory classes for creation of {@link DataNode}s and {@link FlowEdge}s. + */ + +@Alpha +public interface FlowGraph { + + /** + * Add a {@link DataNode} to the {@link FlowGraph} + * @param node {@link DataNode} to be added + * @return true if {@link DataNode} is added to the {@link FlowGraph} successfully. + */ + public boolean addDataNode(DataNode node); + + /** + * Add a {@link FlowEdge} to the {@link FlowGraph} + * @param edge {@link FlowEdge} to be added + * @return true if {@link FlowEdge} is added to the {@link FlowGraph} successfully. + */ + public boolean addFlowEdge(FlowEdge edge); + + /** + * Remove a {@link DataNode} and all its incident edges from the {@link FlowGraph} + * @param nodeId identifier of the {@link DataNode} to be removed + * @return true if {@link DataNode} is removed from the {@link FlowGraph} successfully. + */ + public boolean deleteDataNode(String nodeId); + + /** + * Remove a {@link FlowEdge} from the {@link FlowGraph} + * @param edgeId label of the edge to be removed + * @return true if edge is removed from the {@link FlowGraph} successfully. + */ + public boolean deleteFlowEdge(String edgeId); + + /** + * Get a collection of edges adjacent to a {@link DataNode}. Useful for path finding algorithms and graph + * traversal algorithms such as Djikstra's shortest-path algorithm, BFS + * @param nodeId identifier of the {@link DataNode} + * @return a collection of edges adjacent to the {@link DataNode} + */ + public Collection<FlowEdge> getEdges(String nodeId); + + /** + * Get a collection of edges adjacent to a {@link DataNode}. + * @param node {@link DataNode} + * @return a collection of edges adjacent to the {@link DataNode} + */ + public Collection<FlowEdge> getEdges(DataNode node); +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java new file mode 100644 index 0000000..0d94e3f --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flowgraph; + +public class FlowGraphConfigurationKeys { + public static final String DATA_NODE_PREFIX = "data.node."; + public static final String FLOW_EDGE_PREFIX = "flow.edge."; + + /** + * {@link DataNode} configuration keys. + */ + public static final String DATA_NODE_ID_KEY = DATA_NODE_PREFIX + "id"; + public static final String DATA_NODE_IS_ACTIVE_KEY = DATA_NODE_PREFIX + "isActive"; + + /** + * {@link FlowEdge} configuration keys. + */ + public static final String FLOW_EDGE_SOURCE_KEY = FLOW_EDGE_PREFIX + "source"; + public static final String FLOW_EDGE_DESTINATION_KEY = FLOW_EDGE_PREFIX + "destination"; + public static final String FLOW_EDGE_NAME_KEY = FLOW_EDGE_PREFIX + "name"; + public static final String FLOW_EDGE_IS_ACTIVE_KEY = FLOW_EDGE_PREFIX + "isActive"; + public static final String FLOW_EDGE_TEMPLATE_URI_KEY = FLOW_EDGE_PREFIX + "flowTemplateUri"; + public static final String FLOW_EDGE_SPEC_EXECUTORS_KEY = FLOW_EDGE_PREFIX +"specExecutors"; + public static final String FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY = "specExecutorClass"; +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java new file mode 100644 index 0000000..7bcc18d --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flowgraph; + +import java.net.URI; + +import com.typesafe.config.Config; + +import org.apache.gobblin.annotation.Alpha; + +import joptsimple.internal.Strings; + + +/** + * An implementation of {@link HdfsDataNode}. All the properties specific to a HDFS based data node (e.g. fs.uri) + * are validated here. + */ +@Alpha +public class HdfsDataNode extends FileSystemDataNode { + public static final String HDFS_SCHEME = "hdfs"; + + public HdfsDataNode(Config nodeProps) throws DataNodeCreationException { + super(nodeProps); + } + + /** + * + * @param fsUri FileSystem URI + * @return true if the scheme is "hdfs" and authority is not empty. + */ + @Override + public boolean isUriValid(URI fsUri) { + String scheme = fsUri.getScheme(); + //Check that the scheme is "hdfs" + if(!scheme.equals(HDFS_SCHEME)) { + return false; + } + //Ensure that the authority is not empty + if(Strings.isNullOrEmpty(fsUri.getAuthority())) { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java new file mode 100644 index 0000000..6dc1aa3 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flowgraph; + +import java.net.URI; + +import org.apache.gobblin.annotation.Alpha; + +import com.typesafe.config.Config; + +/** + * An implementation of {@link LocalFSDataNode}. All the properties specific to a LocalFS based data node (e.g. fs.uri) + * are validated here. + */ +@Alpha +public class LocalFSDataNode extends FileSystemDataNode { + public static final String LOCAL_FS_SCHEME = "file"; + + public LocalFSDataNode(Config nodeProps) throws DataNodeCreationException { + super(nodeProps); + } + + /** + * + * @param fsUri FileSystem URI + * @return true if the scheme of fsUri equals "file" + */ + @Override + public boolean isUriValid(URI fsUri) { + String scheme = fsUri.getScheme(); + if(scheme.equals(LOCAL_FS_SCHEME)) { + return true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java new file mode 100644 index 0000000..30d8309 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.template; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import com.typesafe.config.Config; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.runtime.api.JobTemplate; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.service.modules.dataset.DatasetDescriptor; +import org.apache.gobblin.service.modules.flowgraph.Dag; + +/** + * An interface primarily for representing a flow of {@link JobTemplate}s. It also has + * method for retrieving required configs for every {@link JobTemplate} in the flow. + */ +@Alpha +public interface FlowTemplate extends Spec { + + /** + * @return the {@link Collection} of {@link JobTemplate}s that belong to this {@link FlowTemplate}. + */ + List<JobTemplate> getJobTemplates(); + + /** + * + * @return the {@link Dag<JobTemplate>} that backs the {@link FlowTemplate}. + */ + Dag<JobTemplate> getDag() throws IOException; + + /** + * + * @return all configuration inside pre-written template. + */ + Config getRawTemplateConfig(); + + /** + * @return list of input/output {@link DatasetDescriptor}s for the {@link FlowTemplate}. + */ + List<Pair<DatasetDescriptor, DatasetDescriptor>> getInputOutputDatasetDescriptors(); +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java new file mode 100644 index 0000000..3847fd2 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.template; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URI; + +import com.google.common.base.Charsets; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigResolveOptions; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.JobTemplate; +import org.apache.gobblin.runtime.api.SpecNotFoundException; +import org.apache.gobblin.service.modules.template_catalog.FlowCatalogWithTemplates; + +/** + * A {@link FlowTemplate} that loads a HOCON file as a {@link StaticFlowTemplate}. + */ +@Alpha +public class HOCONInputStreamFlowTemplate extends StaticFlowTemplate { + public static final String VERSION_KEY = "gobblin.flow.template.version"; + public static final String DEFAULT_VERSION = "1"; + + public HOCONInputStreamFlowTemplate(InputStream inputStream, URI uri, FlowCatalogWithTemplates catalog) + throws SpecNotFoundException, IOException, ReflectiveOperationException, JobTemplate.TemplateException { + this(ConfigFactory.parseReader(new InputStreamReader(inputStream, Charsets.UTF_8)).resolve( + ConfigResolveOptions.defaults().setAllowUnresolved(true)), uri, catalog); + } + + public HOCONInputStreamFlowTemplate(Config config, URI uri, FlowCatalogWithTemplates catalog) + throws SpecNotFoundException, IOException, ReflectiveOperationException, JobTemplate.TemplateException { + super(uri, config.hasPath(VERSION_KEY) ? config.getString(VERSION_KEY) : DEFAULT_VERSION, + config.hasPath(ConfigurationKeys.FLOW_DESCRIPTION_KEY) ? config + .getString(ConfigurationKeys.FLOW_DESCRIPTION_KEY) : "", config, catalog); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java new file mode 100644 index 0000000..b89da4d --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.template; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.hadoop.fs.Path; + +import org.apache.gobblin.runtime.api.JobTemplate; +import org.apache.gobblin.service.modules.flowgraph.Dag; + +import lombok.extern.slf4j.Slf4j; + + +/** + * A Factory class used for constructing a {@link Dag} of {@link org.apache.gobblin.runtime.api.JobTemplate}s from + * a {@link URI} of a {@link FlowTemplate}. + */ +@Alpha +@Slf4j +public class JobTemplateDagFactory { + public static final String JOB_TEMPLATE_FILE_SUFFIX = ".conf"; + + public static Dag<JobTemplate> createDagFromJobTemplates(List<JobTemplate> jobTemplates) { + Map<URI, Dag.DagNode<JobTemplate>> uriJobTemplateMap = new HashMap<>(); + List<Dag.DagNode<JobTemplate>> dagNodeList = new ArrayList<>(); + /** + * Create a {@link Dag.DagNode<JobTemplate>} for every {@link JobTemplate} in the flow. Add this node + * to a {@link Map<URI,JobTemplate>}. + */ + for (JobTemplate template : jobTemplates) { + Dag.DagNode<JobTemplate> dagNode = new Dag.DagNode<>(template); + dagNodeList.add(dagNode); + uriJobTemplateMap.put(template.getUri(), dagNode); + } + + /** + * Iterate over each {@link JobTemplate} to get the dependencies of each {@link JobTemplate}. + * For each {@link JobTemplate}, get the corresponding {@link Dag.DagNode} and + * set the {@link Dag.DagNode}s corresponding to the dependencies as its parent nodes. + * + * TODO: we likely do not need 2 for loops and we can do this in 1 pass. + */ + Path templateDirPath = new Path(jobTemplates.get(0).getUri()).getParent(); + for (JobTemplate template : jobTemplates) { + URI templateUri = template.getUri(); + Dag.DagNode<JobTemplate> node = uriJobTemplateMap.get(templateUri); + Collection<String> dependencies = template.getDependencies(); + for (String dependency : dependencies) { + URI dependencyUri = new Path(templateDirPath, dependency).suffix(JOB_TEMPLATE_FILE_SUFFIX).toUri(); + Dag.DagNode<JobTemplate> parentNode = uriJobTemplateMap.get(dependencyUri); + node.addParentNode(parentNode); + } + } + Dag<JobTemplate> dag = new Dag<>(dagNodeList); + return dag; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9b91fa1b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java new file mode 100644 index 0000000..8347022 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.template; + +import java.io.IOException; +import java.net.URI; +import java.util.List; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.runtime.api.JobTemplate; +import org.apache.gobblin.runtime.api.SpecNotFoundException; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.dataset.DatasetDescriptor; +import org.apache.gobblin.service.modules.template_catalog.FlowCatalogWithTemplates; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; +import org.apache.hadoop.fs.Path; + +import lombok.Getter; + + +/** + * A {@link FlowTemplate} using a static {@link Config} as the raw configuration for the template. + */ +@Alpha +public class StaticFlowTemplate implements FlowTemplate { + public static final String INPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.dataset.descriptor.input"; + public static final String OUTPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.dataset.descriptor.output"; + public static final String DATASET_DESCRIPTOR_CLASS_KEY = "class"; + + @Getter + private URI uri; + @Getter + private String version; + @Getter + private String description; + @Getter + private FlowCatalogWithTemplates catalog; + @Getter + private List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDatasetDescriptors; + @Getter + private List<JobTemplate> jobTemplates; + + private Dag<JobTemplate> dag; + + private Config rawConfig; + private boolean isTemplateMaterialized; + + public StaticFlowTemplate(URI uri, String version, String description, Config config, + FlowCatalogWithTemplates catalog) + throws IOException, ReflectiveOperationException, SpecNotFoundException, JobTemplate.TemplateException { + this.uri = uri; + this.version = version; + this.description = description; + this.inputOutputDatasetDescriptors = buildInputOutputDescriptors(config); + this.rawConfig = config; + this.catalog = catalog; + URI flowTemplateDir = new Path(this.uri).getParent().toUri(); + this.jobTemplates = this.catalog.getJobTemplatesForFlow(flowTemplateDir); + } + + //Constructor for testing purposes + public StaticFlowTemplate(URI uri, String version, String description, Config config, + FlowCatalogWithTemplates catalog, List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDatasetDescriptors, List<JobTemplate> jobTemplates) + throws IOException, ReflectiveOperationException, SpecNotFoundException, JobTemplate.TemplateException { + this.uri = uri; + this.version = version; + this.description = description; + this.inputOutputDatasetDescriptors = inputOutputDatasetDescriptors; + this.rawConfig = config; + this.catalog = catalog; + this.jobTemplates = jobTemplates; + } + + /** + * Generate the input/output dataset descriptors for the {@link FlowTemplate}. + */ + private List<Pair<DatasetDescriptor, DatasetDescriptor>> buildInputOutputDescriptors(Config config) + throws IOException, ReflectiveOperationException { + if (!config.hasPath(INPUT_DATASET_DESCRIPTOR_PREFIX) || !config.hasPath(OUTPUT_DATASET_DESCRIPTOR_PREFIX)) { + throw new IOException("Flow template must specify at least one input/output dataset descriptor"); + } + int i = 0; + String inputPrefix = Joiner.on(".").join(INPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i)); + List<Pair<DatasetDescriptor, DatasetDescriptor>> result = Lists.newArrayList(); + while (config.hasPath(inputPrefix)) { + Config inputDescriptorConfig = config.getConfig(inputPrefix); + DatasetDescriptor inputDescriptor = getDatasetDescriptor(inputDescriptorConfig); + String outputPrefix = Joiner.on(".").join(OUTPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i++)); + Config outputDescriptorConfig = config.getConfig(outputPrefix); + DatasetDescriptor outputDescriptor = getDatasetDescriptor(outputDescriptorConfig); + result.add(ImmutablePair.of(inputDescriptor, outputDescriptor)); + inputPrefix = Joiner.on(".").join(INPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i)); + } + return result; + } + + private DatasetDescriptor getDatasetDescriptor(Config descriptorConfig) + throws ReflectiveOperationException { + Class datasetDescriptorClass = Class.forName(descriptorConfig.getString(DATASET_DESCRIPTOR_CLASS_KEY)); + return (DatasetDescriptor) GobblinConstructorUtils + .invokeLongestConstructor(datasetDescriptorClass, descriptorConfig); + } + + @Override + public Config getRawTemplateConfig() { + return this.rawConfig; + } + + private void ensureTemplateMaterialized() + throws IOException { + try { + if (!isTemplateMaterialized) { + this.dag = JobTemplateDagFactory.createDagFromJobTemplates(this.jobTemplates); + } + this.isTemplateMaterialized = true; + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public List<JobTemplate> getJobTemplates() { + return this.jobTemplates; + } + + @Override + public Dag<JobTemplate> getDag() + throws IOException { + ensureTemplateMaterialized(); + return this.dag; + } +}
