[GOBBLIN-528] Multihop Flow Compiler for Gobblin-as-a-Service (GaaS). Closes #2393 from sv2000/multiHopCompiler
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/22a951f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/22a951f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/22a951f0 Branch: refs/heads/master Commit: 22a951f0a4ac0c963e99cd2a15989c62a08c81cf Parents: 33d4fea Author: suvasude <[email protected]> Authored: Mon Jul 30 09:57:31 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Jul 30 09:57:31 2018 -0700 ---------------------------------------------------------------------- .../template/HOCONInputStreamJobTemplate.java | 2 +- .../modules/core/GitMonitoringService.java | 2 +- .../dataset/BaseHdfsDatasetDescriptor.java | 98 ----- .../modules/dataset/DatasetDescriptor.java | 38 +- .../modules/dataset/EncryptionConfig.java | 90 ++++ .../modules/dataset/FSDatasetDescriptor.java | 138 ++++++ .../service/modules/dataset/FormatConfig.java | 102 +++++ .../modules/dataset/HdfsDatasetDescriptor.java | 40 -- .../service/modules/flow/FlowEdgeContext.java | 46 ++ .../service/modules/flow/FlowGraphPath.java | 90 ++++ .../modules/flow/FlowGraphPathFinder.java | 320 ++++++++++++++ .../modules/flow/MultiHopFlowCompiler.java | 157 +++++++ .../service/modules/flowgraph/BaseDataNode.java | 8 +- .../service/modules/flowgraph/BaseFlowEdge.java | 20 +- .../modules/flowgraph/BaseFlowGraph.java | 8 +- .../gobblin/service/modules/flowgraph/Dag.java | 10 +- .../service/modules/flowgraph/DataNode.java | 3 +- .../flowgraph/DatasetDescriptorConfigKeys.java | 18 + .../modules/flowgraph/FileSystemDataNode.java | 83 ---- .../service/modules/flowgraph/FlowEdge.java | 12 +- .../service/modules/flowgraph/FlowGraph.java | 7 + .../flowgraph/FlowGraphConfigurationKeys.java | 8 +- .../service/modules/flowgraph/HdfsDataNode.java | 59 --- .../modules/flowgraph/LocalFSDataNode.java | 51 --- .../flowgraph/datanodes/fs/AdlsDataNode.java | 52 +++ .../datanodes/fs/FileSystemDataNode.java | 87 ++++ .../flowgraph/datanodes/fs/HdfsDataNode.java | 58 +++ .../flowgraph/datanodes/fs/LocalFSDataNode.java | 51 +++ .../service/modules/spec/JobExecutionPlan.java | 117 ++++++ .../spec/JobExecutionPlanDagFactory.java | 114 +++++ .../service/modules/template/FlowTemplate.java | 39 +- .../template/HOCONInputStreamFlowTemplate.java | 13 +- .../modules/template/JobTemplateDagFactory.java | 79 ---- .../modules/template/StaticFlowTemplate.java | 143 ++++--- .../modules/template_catalog/FSFlowCatalog.java | 90 ++-- .../FlowCatalogWithTemplates.java | 9 +- .../modules/core/GitFlowGraphMonitorTest.java | 314 ++++++++++++++ .../modules/flow/FlowGraphPathFinderTest.java | 417 +++++++++++++++++++ .../flowgraph/BaseFlowEdgeFactoryTest.java | 74 ++++ .../modules/flowgraph/BaseFlowGraphTest.java | 13 +- .../spec/JobExecutionPlanDagFactoryTest.java | 116 ++++++ .../template/JobTemplateDagFactoryTest.java | 92 ---- .../template_catalog/FSFlowCatalogTest.java | 53 ++- .../src/test/resources/flow/flow.conf | 24 ++ .../datanodes/AdlsDataNode-1.properties | 3 + .../datanodes/HdfsDataNode-1.properties | 3 + .../datanodes/HdfsDataNode-2.properties | 3 + .../datanodes/HdfsDataNode-3.properties | 3 + .../datanodes/HdfsDataNode-4.properties | 3 + .../datanodes/LocalFsDataNode-1.properties | 3 + .../hdfs-1-to-hdfs-1-encrypt.properties | 9 + .../flowedges/hdfs-1-to-hdfs-3.properties | 10 + .../hdfs-2-to-hdfs-2-encrypt.properties | 9 + .../flowedges/hdfs-2-to-hdfs-4.properties | 9 + .../flowedges/hdfs-3-to-adls-1.properties | 13 + .../flowedges/hdfs-4-to-adls-1.properties | 13 + .../flowedges/local-to-hdfs-1.properties | 9 + .../flowedges/local-to-hdfs-2.properties | 9 + .../modules/core/GitFlowGraphMonitorTest.java | 314 -------------- .../flowgraph/BaseFlowEdgeFactoryTest.java | 73 ---- .../template_catalog/flowEdgeTemplate/flow.conf | 20 + .../flowEdgeTemplate/jobs/job1.job | 1 + .../flowEdgeTemplate/jobs/job2.job | 3 + .../flowEdgeTemplate/jobs/job3.job | 2 + .../flowEdgeTemplate/jobs/job4.job | 2 + .../hdfsConvertToJsonAndEncrypt/flow.conf | 18 + .../jobs/hdfs-encrypt-avro-to-json.job | 1 + .../flowEdgeTemplates/hdfsToAdl/flow.conf | 18 + .../hdfsToAdl/jobs/distcp-hdfs-to-adl.job | 1 + .../flowEdgeTemplates/hdfsToHdfs/flow.conf | 15 + .../hdfsToHdfs/jobs/distcp-hdfs-to-hdfs.job | 1 + .../flowEdgeTemplates/localToHdfs/flow.conf | 9 + .../localToHdfs/jobs/distcp-local-to-hdfs.job | 1 + .../distcp-push-hdfs-to-adl.template | 65 +++ .../multihop/jobTemplates/distcp.template | 57 +++ .../hdfs-convert-to-json-and-encrypt.template | 42 ++ .../template_catalog/templates/job1.template | 2 + .../template_catalog/templates/job2.template | 2 + .../template_catalog/templates/job3.template | 2 + .../template_catalog/templates/job4.template | 2 + .../template_catalog/test-template/flow.conf | 30 +- .../test-template/jobs/job1.conf | 2 - .../test-template/jobs/job1.job | 1 + .../test-template/jobs/job2.conf | 3 - .../test-template/jobs/job2.job | 3 + .../test-template/jobs/job3.conf | 3 - .../test-template/jobs/job3.job | 2 + .../test-template/jobs/job4.conf | 3 - .../test-template/jobs/job4.job | 2 + 89 files changed, 3082 insertions(+), 1082 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java index a1337fd..5e132fe 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/HOCONInputStreamJobTemplate.java @@ -51,7 +51,7 @@ public class HOCONInputStreamJobTemplate extends StaticJobTemplate { this(ConfigFactory.parseReader(new InputStreamReader(inputStream, Charsets.UTF_8)), uri, catalog); } - private HOCONInputStreamJobTemplate(Config config, URI uri, JobCatalogWithTemplates catalog) + public HOCONInputStreamJobTemplate(Config config, URI uri, JobCatalogWithTemplates catalog) throws SpecNotFoundException, TemplateException { super(uri, config.hasPath(VERSION_KEY) ? config.getString(VERSION_KEY) : DEFAULT_VERSION, config.hasPath(ConfigurationKeys.JOB_DESCRIPTION_KEY) ? config.getString(ConfigurationKeys.JOB_DESCRIPTION_KEY) : "", http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java index 2361edc..c4d3656 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java @@ -107,7 +107,7 @@ public abstract class GitMonitoringService extends AbstractIdleService { ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("FetchGitConfExecutor"))); } - synchronized void setActive(boolean isActive) { + public synchronized void setActive(boolean isActive) { if (this.isActive == isActive) { // No-op if already in correct state return; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java deleted file mode 100644 index 7d7e2b4..0000000 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseHdfsDatasetDescriptor.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service.modules.dataset; - -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.typesafe.config.Config; - -import org.apache.gobblin.annotation.Alpha; -import org.apache.gobblin.service.ServiceConfigKeys; -import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys; -import org.apache.gobblin.util.ConfigUtils; - -import lombok.Getter; - - -/** - * An implementation of {@link HdfsDatasetDescriptor}. - */ -@Alpha -public class BaseHdfsDatasetDescriptor implements HdfsDatasetDescriptor { - @Getter - private final String path; - @Getter - private final String format; - @Getter - private final String description; - @Getter - private final String platform; - - public BaseHdfsDatasetDescriptor(Config config) { - Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PATH_KEY), String.format("Missing required property %s", DatasetDescriptorConfigKeys.PATH_KEY)); - Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.FORMAT_KEY), String.format("Missing required property %s", DatasetDescriptorConfigKeys.FORMAT_KEY)); - - this.path = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY, null); - this.format = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.FORMAT_KEY, null); - this.description = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.DESCRIPTION_KEY, ""); - this.platform = "hdfs"; - } - - /** - * A {@link HdfsDatasetDescriptor} is compatible with another {@link DatasetDescriptor} iff they have identical - * platform, type, path, and format. - * TODO: Currently isCompatibleWith() only checks if HDFS paths described by the two {@link DatasetDescriptor}s - * being compared are identical. Need to enhance this for the case of where paths can contain glob patterns. - * e.g. paths described by the pattern /data/input/* are a subset of paths described by /data/* and hence, the - * two descriptors should be compatible. - * @return true if this {@link HdfsDatasetDescriptor} is compatibe with another {@link DatasetDescriptor}. - */ - @Override - public boolean isCompatibleWith(DatasetDescriptor o) { - return this.equals(o); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - HdfsDatasetDescriptor other = (HdfsDatasetDescriptor) o; - if(this.getPlatform() == null || other.getPlatform() == null) { - return false; - } - if(!this.getPlatform().equalsIgnoreCase(other.getPlatform()) || !(o instanceof HdfsDatasetDescriptor)) { - return false; - } - - return this.getPath().equals(other.getPath()) && this.getFormat().equalsIgnoreCase(other.getFormat()); - } - - @Override - public String toString() { - return "(" + Joiner.on(",").join(this.getPlatform(),this.getPath(),this.getFormat()) + ")"; - } - - @Override - public int hashCode() { - return this.toString().hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java index 4a322e6..e8474e3 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java @@ -17,28 +17,54 @@ package org.apache.gobblin.service.modules.dataset; +import com.typesafe.config.Config; + import org.apache.gobblin.annotation.Alpha; /** - * The interface for dataset descriptors. + * The interface for dataset descriptors. Each dataset is described in terms of the following attributes: + * <ul> + * <p> platform (e.g. HDFS, ADLS, JDBC). </p> + * <p> path, which describes the fully qualified name of the dataset. </p> + * <p> a format descriptor, which encapsulates its representation (e.g. avro, csv), codec (e.g. gzip, deflate), and + * encryption config (e.g. aes_rotating, gpg). </p> + * </ul> */ @Alpha public interface DatasetDescriptor { /** - * @return the dataset platform i.e. the storage backing the dataset (e.g. HDFS, JDBC, Espresso etc.) + * @return the dataset platform i.e. the storage system backing the dataset (e.g. HDFS, ADLS, JDBC etc.) */ public String getPlatform(); /** + * Returns the fully qualified name of a dataset. The fully qualified name is the absolute directory path of a dataset + * when the dataset is backed by a FileSystem. In the case of a database table, it is dbName.tableName. + * @return dataset path. + */ + public String getPath(); + + /** + * + * @return storage format of the dataset. + */ + public FormatConfig getFormatConfig(); + + /** * @return a human-readable description of the dataset. */ public String getDescription(); /** - * @return true if this {@link DatasetDescriptor} is compatible with the other {@link DatasetDescriptor} i.e. the - * datasets described by this {@link DatasetDescriptor} is a subset of the datasets described by the other {@link DatasetDescriptor}. - * This check is non-commutative. + * @return true if this {@link DatasetDescriptor} contains the other {@link DatasetDescriptor} i.e. the + * datasets described by this {@link DatasetDescriptor} is a subset of the datasets described by the other + * {@link DatasetDescriptor}. This operation is non-commutative. + */ + public boolean contains(DatasetDescriptor other); + + /** + * @return the raw config. */ - public boolean isCompatibleWith(DatasetDescriptor other); + public Config getRawConfig(); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java new file mode 100644 index 0000000..21c7c17 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.dataset; + +import com.google.common.base.Joiner; +import com.typesafe.config.Config; + +import lombok.Getter; + +import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys; +import org.apache.gobblin.util.ConfigUtils; + + +public class EncryptionConfig { + @Getter + private final String encryptionAlgorithm; + @Getter + private final String keystoreType; + @Getter + private final String keystoreEncoding; + + public EncryptionConfig(Config encryptionConfig) { + this.encryptionAlgorithm = ConfigUtils.getString(encryptionConfig, DatasetDescriptorConfigKeys.ENCRYPTION_ALGORITHM_KEY, + DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY); + this.keystoreType = ConfigUtils.getString(encryptionConfig, DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_TYPE_KEY, + DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY); + this.keystoreEncoding = ConfigUtils.getString(encryptionConfig, DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_ENCODING_KEY, + DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY); + } + + public boolean contains(EncryptionConfig other) { + if (other == null) { + return false; + } + + String otherEncryptionAlgorithm = other.getEncryptionAlgorithm(); + String otherKeystoreType = other.getKeystoreType(); + String otherKeystoreEncoding = other.getKeystoreEncoding(); + + return (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getEncryptionAlgorithm()) + || this.encryptionAlgorithm.equalsIgnoreCase(otherEncryptionAlgorithm)) + && (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getKeystoreType()) + || this.keystoreType.equalsIgnoreCase(otherKeystoreType)) + && (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getKeystoreEncoding()) + || this.keystoreEncoding.equalsIgnoreCase(otherKeystoreEncoding)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof EncryptionConfig)) { + return false; + } + EncryptionConfig other = (EncryptionConfig) o; + return this.getEncryptionAlgorithm().equalsIgnoreCase(other.getEncryptionAlgorithm()) && this.keystoreEncoding.equalsIgnoreCase(other.getKeystoreEncoding()) + && this.getKeystoreType().equalsIgnoreCase(other.getKeystoreType()); + } + + @Override + public String toString() { + return "(" + Joiner.on(",").join(this.encryptionAlgorithm, this.keystoreType, this.keystoreEncoding) + ")"; + } + + @Override + public int hashCode() { + int result = 17; + result = 31 * result + encryptionAlgorithm.toLowerCase().hashCode(); + result = 31 * result + keystoreType.toLowerCase().hashCode(); + result = 31 * result + keystoreEncoding.toLowerCase().hashCode(); + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java new file mode 100644 index 0000000..a5cb717 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.dataset; + +import org.apache.hadoop.fs.GlobPattern; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; + +import lombok.Getter; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.PathUtils; + + +/** + * An implementation of {@link DatasetDescriptor} with FS-based storage. + */ +@Alpha +public class FSDatasetDescriptor implements DatasetDescriptor { + @Getter + private final String platform; + @Getter + private final String path; + @Getter + private final FormatConfig formatConfig; + @Getter + private final String description; + @Getter + private final Config rawConfig; + + public FSDatasetDescriptor(Config config) { + Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PLATFORM_KEY), "Dataset descriptor config must specify platform"); + this.platform = config.getString(DatasetDescriptorConfigKeys.PLATFORM_KEY); + this.path = PathUtils.getPathWithoutSchemeAndAuthority(new Path(ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY, + DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY))).toString(); + this.formatConfig = new FormatConfig(config); + this.description = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.DESCRIPTION_KEY, ""); + this.rawConfig = config; + } + + /** + * A helper to determine if the path description of this {@link DatasetDescriptor} is a superset of paths + * accepted by the other {@link DatasetDescriptor}. If the path description of the other {@link DatasetDescriptor} + * is a glob pattern, we return false. + * + * @param otherPath a glob pattern that describes a set of paths. + * @return true if the glob pattern described by the otherPath matches the path in this {@link DatasetDescriptor}. + */ + public boolean isPathContaining(String otherPath) { + if (otherPath == null) { + return false; + } + if (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getPath())) { + return true; + } + if (PathUtils.isGlob(new Path(otherPath))) { + return false; + } + GlobPattern globPattern = new GlobPattern(this.getPath()); + return globPattern.matches(otherPath); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean contains(DatasetDescriptor o) { + if (this == o) { + return true; + } + if (!(o instanceof FSDatasetDescriptor)) { + return false; + } + FSDatasetDescriptor other = (FSDatasetDescriptor) o; + + if (this.getPlatform() == null || other.getPlatform() == null || !this.getPlatform().equalsIgnoreCase(other.getPlatform())) { + return false; + } + + return getFormatConfig().contains(other.getFormatConfig()) && isPathContaining(other.getPath()); + } + + /** + * + * @param o + * @return true iff "this" dataset descriptor is compatible with the "other" and the "other" dataset descriptor is + * compatible with this dataset descriptor. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof FSDatasetDescriptor)) { + return false; + } + FSDatasetDescriptor other = (FSDatasetDescriptor) o; + if (this.getPlatform() == null || other.getPlatform() == null || !this.getPlatform().equalsIgnoreCase(other.getPlatform())) { + return false; + } + return this.getPath().equals(other.getPath()) && this.getFormatConfig().equals(other.getFormatConfig()); + } + + @Override + public String toString() { + return "(" + Joiner.on(",").join(this.getPlatform(), this.getPath(), this.getFormatConfig().toString()) + ")"; + } + + @Override + public int hashCode() { + int result = 17; + result = 31 * result + platform.toLowerCase().hashCode(); + result = 31 * result + path.hashCode(); + result = 31 * result + getFormatConfig().hashCode(); + return result; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java new file mode 100644 index 0000000..a36182c --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.dataset; + +import com.google.common.base.Joiner; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import lombok.Getter; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys; +import org.apache.gobblin.util.ConfigUtils; + + +/** + * A location-independent descriptor of a dataset, which describes a dataset in terms of its physical attributes. + * The physical attributes include: + * <ul> + * <p> Data format (e.g. Avro, CSV, JSON). </p> + * <p> Data encoding type (e.g. Gzip, Bzip2, Base64, Deflate). </p> + * <p> Encryption properties (e.g. aes_rotating, gpg). </p> + * </ul> + */ +@Alpha +public class FormatConfig { + @Getter + private final String format; + @Getter + private final String codecType; + @Getter + private final EncryptionConfig encryptionConfig; + + public FormatConfig(Config config) { + this.format = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.FORMAT_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY); + this.codecType = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.CODEC_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY); + this.encryptionConfig = new EncryptionConfig(ConfigUtils.getConfig(config, DatasetDescriptorConfigKeys.ENCYPTION_PREFIX, ConfigFactory + .empty())); + } + + public boolean contains(FormatConfig other) { + return containsFormat(other.getFormat()) && containsCodec(other.getCodecType()) + && containsEncryptionConfig(other.getEncryptionConfig()); + } + + private boolean containsFormat(String otherFormat) { + return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getFormat()) + || (this.getFormat().equalsIgnoreCase(otherFormat)); + } + + private boolean containsCodec(String otherCodecType) { + return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getCodecType()) + || (this.getCodecType().equalsIgnoreCase(otherCodecType)); + } + + private boolean containsEncryptionConfig(EncryptionConfig otherEncryptionConfig) { + return this.getEncryptionConfig().contains(otherEncryptionConfig); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof FormatConfig)) { + return false; + } + FormatConfig other = (FormatConfig) o; + return this.getFormat().equalsIgnoreCase(other.getFormat()) && this.getCodecType().equalsIgnoreCase(other.getCodecType()) + && this.getEncryptionConfig().equals(other.getEncryptionConfig()); + } + + @Override + public String toString() { + return "(" + Joiner.on(",").join(this.getFormat(), this.getCodecType(), this.getEncryptionConfig().toString()) + ")"; + } + + @Override + public int hashCode() { + int result = 17; + result = 31 * result + codecType.toLowerCase().hashCode(); + result = 31 * result + format.toLowerCase().hashCode(); + result = 31 * result + encryptionConfig.hashCode(); + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java deleted file mode 100644 index 6f1970c..0000000 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HdfsDatasetDescriptor.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service.modules.dataset; - -import org.apache.gobblin.annotation.Alpha; - - -/** - * A descriptor interface for HDFS datasets - */ -@Alpha -public interface HdfsDatasetDescriptor extends DatasetDescriptor { - /** - * - * @return dataset path. - */ - public String getPath(); - - /** - * - * @return storage format of the dataset. - */ - public String getFormat(); - -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java new file mode 100644 index 0000000..daff8ce --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.flow; + +import com.typesafe.config.Config; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; + +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.service.modules.dataset.DatasetDescriptor; +import org.apache.gobblin.service.modules.flowgraph.FlowEdge; + + +/** + * A helper class used to maintain additional context associated with each {@link FlowEdge} during path + * computation while the edge is explored for its eligibility. The additional context includes the input + * {@link DatasetDescriptor} of this edge which is compatible with the previous {@link FlowEdge}'s output + * {@link DatasetDescriptor} (where "previous" means the immediately preceding {@link FlowEdge} visited before + * the current {@link FlowEdge}), and the corresponding output dataset descriptor of the current {@link FlowEdge}. + */ +@AllArgsConstructor +@EqualsAndHashCode(exclude = {"mergedConfig", "specExecutor"}) +@Getter +public class FlowEdgeContext { + private FlowEdge edge; + private DatasetDescriptor inputDatasetDescriptor; + private DatasetDescriptor outputDatasetDescriptor; + private Config mergedConfig; + private SpecExecutor specExecutor; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java new file mode 100644 index 0000000..c642708 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flow; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import com.typesafe.config.Config; + +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobTemplate; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecNotFoundException; +import org.apache.gobblin.service.modules.dataset.DatasetDescriptor; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.FlowEdge; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory; +import org.apache.gobblin.service.modules.template.FlowTemplate; + + +/** + * A class that returns a {@link Dag} of {@link JobExecutionPlan}s from a sequence of edges + * represented as a {@link List} of {@link FlowEdgeContext}s. + */ +public class FlowGraphPath { + private List<FlowEdgeContext> path; + private FlowSpec flowSpec; + private Long flowExecutionId; + + public FlowGraphPath(List<FlowEdgeContext> path, FlowSpec flowSpec, Long flowExecutionId) { + this.path = path; + this.flowSpec = flowSpec; + this.flowExecutionId = flowExecutionId; + } + + public Dag<JobExecutionPlan> asDag() + throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException { + Dag<JobExecutionPlan> flowDag = new Dag<>(new ArrayList<>()); + Iterator<FlowEdgeContext> pathIterator = path.iterator(); + while (pathIterator.hasNext()) { + Dag<JobExecutionPlan> flowEdgeDag = convertHopToDag(pathIterator.next()); + flowDag = flowDag.concatenate(flowEdgeDag); + } + return flowDag; + } + + /** + * Given an instance of {@link FlowEdge}, this method returns a {@link Dag < JobExecutionPlan >} that moves data + * from the source of the {@link FlowEdge} to the destination of the {@link FlowEdge}. + * @param flowEdgeContext an instance of {@link FlowEdgeContext}. + * @return a {@link Dag} of {@link JobExecutionPlan}s associated with the {@link FlowEdge}. + */ + private Dag<JobExecutionPlan> convertHopToDag(FlowEdgeContext flowEdgeContext) + throws SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException { + FlowTemplate flowTemplate = flowEdgeContext.getEdge().getFlowTemplate(); + DatasetDescriptor inputDatasetDescriptor = flowEdgeContext.getInputDatasetDescriptor(); + DatasetDescriptor outputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor(); + Config mergedConfig = flowEdgeContext.getMergedConfig(); + SpecExecutor specExecutor = flowEdgeContext.getSpecExecutor(); + + List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>(); + + //Get resolved job configs from the flow template + List<Config> resolvedJobConfigs = flowTemplate.getResolvedJobConfigs(mergedConfig, inputDatasetDescriptor, outputDatasetDescriptor); + //Iterate over each resolved job config and convert the config to a JobSpec. + for (Config resolvedJobConfig : resolvedJobConfigs) { + jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, resolvedJobConfig, specExecutor, flowExecutionId)); + } + return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java new file mode 100644 index 0000000..2b4746c --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flow; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.apache.commons.lang3.tuple.Pair; + +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobTemplate; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecNotFoundException; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.service.modules.dataset.DatasetDescriptor; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DataNode; +import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys; +import org.apache.gobblin.service.modules.flowgraph.FlowEdge; +import org.apache.gobblin.service.modules.flowgraph.FlowGraph; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + + +@Alpha +@Slf4j +public class FlowGraphPathFinder { + private static final String SOURCE_PREFIX = "source"; + private static final String DESTINATION_PREFIX = "destination"; + + private FlowGraph flowGraph; + private FlowSpec flowSpec; + private Config flowConfig; + + private DataNode srcNode; + private DataNode destNode; + + private DatasetDescriptor srcDatasetDescriptor; + private DatasetDescriptor destDatasetDescriptor; + + //Maintain path of FlowEdges as parent-child map + private Map<FlowEdgeContext, FlowEdgeContext> pathMap; + + //Flow Execution Id + private Long flowExecutionId; + + /** + * Constructor. + * @param flowGraph + */ + public FlowGraphPathFinder(FlowGraph flowGraph, FlowSpec flowSpec) { + this.flowGraph = flowGraph; + this.flowSpec = flowSpec; + this.flowConfig = flowSpec.getConfig(); + + //Get src/dest DataNodes from the flow config + String srcNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""); + String destNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, ""); + this.srcNode = this.flowGraph.getNode(srcNodeId); + Preconditions.checkArgument(srcNode != null, "Flowgraph does not have a node with id " + srcNodeId); + this.destNode = this.flowGraph.getNode(destNodeId); + Preconditions.checkArgument(destNode != null, "Flowgraph does not have a node with id " + destNodeId); + + //Get src/dest dataset descriptors from the flow config + Config srcDatasetDescriptorConfig = + flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX); + Config destDatasetDescriptorConfig = + flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX); + + try { + Class srcdatasetDescriptorClass = + Class.forName(srcDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY)); + this.srcDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils + .invokeLongestConstructor(srcdatasetDescriptorClass, srcDatasetDescriptorConfig); + Class destDatasetDescriptorClass = + Class.forName(destDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY)); + this.destDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils + .invokeLongestConstructor(destDatasetDescriptorClass, destDatasetDescriptorConfig); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * A simple path finding algorithm based on Breadth-First Search. At every step the algorithm adds the adjacent {@link FlowEdge}s + * to a queue. The {@link FlowEdge}s whose output {@link DatasetDescriptor} matches the destDatasetDescriptor are + * added first to the queue. This ensures that dataset transformations are always performed closest to the source. + * @return a path of {@link FlowEdgeContext}s starting at the srcNode and ending at the destNode. + */ + public FlowGraphPath findPath() throws PathFinderException { + try { + //Initialization of auxiliary data structures used for path computation + this.pathMap = new HashMap<>(); + + // Generate flow execution id for this compilation + this.flowExecutionId = System.currentTimeMillis(); + + //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the + // flow graph. + // TODO: we can easily improve the performance by using a ReentrantReadWriteLock associated with the FlowGraph. This will + // allow multiple concurrent readers to not be blocked on each other, as long as there are no writers. + synchronized (this.flowGraph) { + //Base condition 1: Source Node or Dest Node is inactive; return null + if (!srcNode.isActive() || !destNode.isActive()) { + log.warn("Either source node {} or destination node {} is inactive; skipping path computation.", this.srcNode.getId(), + this.destNode.getId()); + return null; + } + + //Base condition 2: Check if we are already at the target. If so, return an empty path. + if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor)) { + return new FlowGraphPath(new ArrayList<>(), flowSpec, flowExecutionId); + } + + LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>(); + edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor)); + for (FlowEdgeContext flowEdgeContext : edgeQueue) { + this.pathMap.put(flowEdgeContext, flowEdgeContext); + } + + //At every step, pop an edge E from the edge queue. Mark the edge E as visited. Generate the list of adjacent edges + // to the edge E. For each adjacent edge E', do the following: + // 1. check if the FlowTemplate described by E' is resolvable using the flowConfig, and + // 2. check if the output dataset descriptor of edge E is compatible with the input dataset descriptor of the + // edge E'. If yes, add the edge E' to the edge queue. + // If the edge E' satisfies 1 and 2, add it to the edge queue for further consideration. + while (!edgeQueue.isEmpty()) { + FlowEdgeContext flowEdgeContext = edgeQueue.pop(); + + DataNode currentNode = this.flowGraph.getNode(flowEdgeContext.getEdge().getDest()); + DatasetDescriptor currentOutputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor(); + + //Are we done? + if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, destDatasetDescriptor)) { + return constructPath(flowEdgeContext); + } + + //Expand the currentNode to its adjacent edges and add them to the queue. + List<FlowEdgeContext> nextEdges = + getNextEdges(currentNode, currentOutputDatasetDescriptor, destDatasetDescriptor); + for (FlowEdgeContext childFlowEdgeContext : nextEdges) { + //Add a pointer from the child edge to the parent edge, if the child edge is not already in the + // queue. + if (!this.pathMap.containsKey(childFlowEdgeContext)) { + edgeQueue.add(childFlowEdgeContext); + this.pathMap.put(childFlowEdgeContext, flowEdgeContext); + } + } + } + } + //No path found. Return null. + return null; + } catch (SpecNotFoundException | JobTemplate.TemplateException | IOException | URISyntaxException e) { + throw new PathFinderException( + "Exception encountered when computing path from src: " + this.srcNode.getId() + " to dest: " + this.destNode.getId(), e); + } + } + + private boolean isPathFound(DataNode currentNode, DataNode destNode, DatasetDescriptor currentDatasetDescriptor, + DatasetDescriptor destDatasetDescriptor) { + if ((currentNode.equals(destNode)) && (currentDatasetDescriptor.equals(destDatasetDescriptor))) { + return true; + } + return false; + } + + /** + * A helper method that sorts the {@link FlowEdge}s incident on srcNode based on whether the FlowEdge has an + * output {@link DatasetDescriptor} that is compatible with the targetDatasetDescriptor. + * @param dataNode + * @param currentDatasetDescriptor Output {@link DatasetDescriptor} of the current edge. + * @param destDatasetDescriptor Target {@link DatasetDescriptor}. + * @return prioritized list of {@link FlowEdge}s to be added to the edge queue for expansion. + */ + private List<FlowEdgeContext> getNextEdges(DataNode dataNode, DatasetDescriptor currentDatasetDescriptor, + DatasetDescriptor destDatasetDescriptor) { + List<FlowEdgeContext> prioritizedEdgeList = new LinkedList<>(); + for (FlowEdge flowEdge : this.flowGraph.getEdges(dataNode)) { + try { + DataNode edgeDestination = this.flowGraph.getNode(flowEdge.getDest()); + //Base condition: Skip this FLowEdge, if it is inactive or if the destination of this edge is inactive. + if (!edgeDestination.isActive() || !flowEdge.isActive()) { + continue; + } + + boolean foundExecutor = false; + //Iterate over all executors for this edge. Find the first one that resolves the underlying flow template. + for (SpecExecutor specExecutor: flowEdge.getExecutors()) { + Config mergedConfig = getMergedConfig(flowEdge, specExecutor); + List<Pair<DatasetDescriptor, DatasetDescriptor>> datasetDescriptorPairs = + flowEdge.getFlowTemplate().getResolvingDatasetDescriptors(mergedConfig); + for (Pair<DatasetDescriptor, DatasetDescriptor> datasetDescriptorPair : datasetDescriptorPairs) { + DatasetDescriptor inputDatasetDescriptor = datasetDescriptorPair.getLeft(); + DatasetDescriptor outputDatasetDescriptor = datasetDescriptorPair.getRight(); + if (inputDatasetDescriptor.contains(currentDatasetDescriptor)) { + FlowEdgeContext flowEdgeContext; + if (outputDatasetDescriptor.contains(currentDatasetDescriptor)) { + //If datasets described by the currentDatasetDescriptor is a subset of the datasets described + // by the outputDatasetDescriptor (i.e. currentDatasetDescriptor is more "specific" than outputDatasetDescriptor, e.g. + // as in the case of a "distcp" edge), we propagate the more "specific" dataset descriptor forward. + flowEdgeContext = new FlowEdgeContext(flowEdge, currentDatasetDescriptor, currentDatasetDescriptor, mergedConfig, specExecutor); + } else { + //outputDatasetDescriptor is more specific (e.g. if it is a dataset transformation edge) + flowEdgeContext = new FlowEdgeContext(flowEdge, currentDatasetDescriptor, outputDatasetDescriptor, mergedConfig, specExecutor); + } + if (destDatasetDescriptor.getFormatConfig().contains(outputDatasetDescriptor.getFormatConfig())) { + //Add to the front of the edge list if platform-independent properties of the output descriptor is compatible + // with those of destination dataset descriptor. + // In other words, we prioritize edges that perform data transformations as close to the source as possible. + prioritizedEdgeList.add(0, flowEdgeContext); + } else { + prioritizedEdgeList.add(flowEdgeContext); + } + foundExecutor = true; + } + } + // Found a SpecExecutor. Proceed to the next FlowEdge. + // TODO: Choose the min-cost executor for the FlowEdge as opposed to the first one that resolves. + if (foundExecutor) { + break; + } + } + } catch (IOException | ReflectiveOperationException | InterruptedException | ExecutionException | SpecNotFoundException + | JobTemplate.TemplateException e) { + //Skip the edge; and continue + log.warn("Skipping edge {} with config {} due to exception: {}", flowEdge.getId(), flowConfig.toString(), e); + } + } + return prioritizedEdgeList; + } + + /** + * Build the merged config for each {@link FlowEdge}, which is a combination of (in the precedence described below): + * <ul> + * <p> the user provided flow config </p> + * <p> edge specific properties/overrides </p> + * <p> spec executor config/overrides </p> + * <p> source node config </p> + * <p> destination node config </p> + * </ul> + * Each {@link JobTemplate}'s config will eventually be resolved against this merged config. + * @param flowEdge An instance of {@link FlowEdge}. + * @param specExecutor A {@link SpecExecutor}. + * @return the merged config derived as described above. + */ + private Config getMergedConfig(FlowEdge flowEdge, SpecExecutor specExecutor) + throws ExecutionException, InterruptedException { + Config srcNodeConfig = this.flowGraph.getNode(flowEdge.getSrc()).getRawConfig().atPath(SOURCE_PREFIX); + Config destNodeConfig = this.flowGraph.getNode(flowEdge.getDest()).getRawConfig().atPath(DESTINATION_PREFIX); + Config mergedConfig = flowConfig.withFallback(specExecutor.getConfig().get()).withFallback(flowEdge.getConfig()) + .withFallback(srcNodeConfig).withFallback(destNodeConfig); + return mergedConfig; + } + + /** + * + * @param flowEdgeContext of the last {@link FlowEdge} in the path. + * @return a {@link Dag} of {@link JobExecutionPlan}s for the input {@link FlowSpec}. + * @throws IOException + * @throws SpecNotFoundException + * @throws JobTemplate.TemplateException + * @throws URISyntaxException + */ + private FlowGraphPath constructPath(FlowEdgeContext flowEdgeContext) + throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException { + //Backtrace from the last edge using the path map and push each edge into a LIFO data structure. + List<FlowEdgeContext> path = new LinkedList<>(); + path.add(flowEdgeContext); + FlowEdgeContext currentFlowEdgeContext = flowEdgeContext; + while (true) { + path.add(0, this.pathMap.get(currentFlowEdgeContext)); + currentFlowEdgeContext = this.pathMap.get(currentFlowEdgeContext); + //Are we at the first edge in the path? + if (this.pathMap.get(currentFlowEdgeContext).equals(currentFlowEdgeContext)) { + break; + } + } + FlowGraphPath flowGraphPath = new FlowGraphPath(path, flowSpec, flowExecutionId); + return flowGraphPath; + } + + public static class PathFinderException extends Exception { + public PathFinderException(String message, Throwable cause) { + super(message, cause); + } + + public PathFinderException(String message) { + super(message); + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java new file mode 100644 index 0000000..8b14b10 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flow; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.typesafe.config.Config; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobTemplate; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecNotFoundException; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor; +import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.FlowGraph; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; + + +/*** + * Take in a logical {@link Spec} ie flow and compile corresponding materialized job {@link Spec} + * and its mapping to {@link SpecExecutor}. + */ +@Alpha +@Slf4j +public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { + @Getter + private FlowGraph flowGraph; + private GitFlowGraphMonitor gitFlowGraphMonitor; + @Getter + private boolean active; + + public MultiHopFlowCompiler(Config config) { + this(config, true); + } + + public MultiHopFlowCompiler(Config config, boolean instrumentationEnabled) { + this(config, Optional.<Logger>absent(), instrumentationEnabled); + } + + public MultiHopFlowCompiler(Config config, Optional<Logger> log) { + this(config, log, true); + } + + public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) { + super(config, log, instrumentationEnabled); + Config templateCatalogCfg = config + .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, + config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)); + FSFlowCatalog flowCatalog; + try { + flowCatalog = new FSFlowCatalog(templateCatalogCfg); + } catch (IOException e) { + throw new RuntimeException("Cannot instantiate " + getClass().getName(), e); + } + this.flowGraph = new BaseFlowGraph(); + this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph); + } + + public void setActive(boolean active) { + this.active = active; + this.gitFlowGraphMonitor.setActive(active); + } + + /** + * TODO: We need to change signature of compileFlow to return a Dag instead of a HashMap to capture + * job dependencies. + * @param spec + * @return + */ + @Override + public Map<Spec, SpecExecutor> compileFlow(Spec spec) { + Preconditions.checkNotNull(spec); + Preconditions.checkArgument(spec instanceof FlowSpec, "MultiHopFlowToJobSpecCompiler only accepts FlowSpecs"); + + long startTime = System.nanoTime(); + Map<Spec, SpecExecutor> specExecutorMap = Maps.newLinkedHashMap(); + + FlowSpec flowSpec = (FlowSpec) spec; + String source = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY); + String destination = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY); + log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination)); + + FlowGraphPathFinder pathFinder = new FlowGraphPathFinder(this.flowGraph, flowSpec); + try { + //Compute the path from source to destination. + FlowGraphPath flowGraphPath = pathFinder.findPath(); + + //Convert the path into a Dag of JobExecutionPlans. + Dag<JobExecutionPlan> jobExecutionPlanDag; + if (flowGraphPath != null) { + jobExecutionPlanDag = flowGraphPath.asDag(); + } else { + Instrumented.markMeter(this.flowCompilationFailedMeter); + log.info(String.format("No path found from source: %s and destination: %s", source, destination)); + return null; + } + + //TODO: Just a dummy return value for now. compileFlow() signature needs to be modified to return a Dag instead + // of a Map. For now just add all specs into the map. + for (Dag.DagNode<JobExecutionPlan> node: jobExecutionPlanDag.getNodes()) { + JobExecutionPlan jobExecutionPlan = node.getValue(); + specExecutorMap.put(jobExecutionPlan.getJobSpec(), jobExecutionPlan.getSpecExecutor()); + } + } catch (FlowGraphPathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | IOException + | URISyntaxException e) { + Instrumented.markMeter(this.flowCompilationFailedMeter); + log.error(String.format("Exception encountered while compiling flow for source: %s and destination: %s", source, destination), e); + return null; + } + Instrumented.markMeter(this.flowCompilationSuccessFulMeter); + Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + + return specExecutorMap; + } + + @Override + protected void populateEdgeTemplateMap() { + log.warn("No population of templates based on edge happen in this implementation"); + return; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java index 731bc22..4fb9711 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java @@ -19,9 +19,9 @@ package org.apache.gobblin.service.modules.flowgraph; import com.google.common.base.Preconditions; import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import org.apache.gobblin.annotation.Alpha; -import org.apache.gobblin.service.modules.dataset.DatasetDescriptor; import org.apache.gobblin.util.ConfigUtils; import joptsimple.internal.Strings; @@ -38,7 +38,7 @@ public class BaseDataNode implements DataNode { @Getter private String id; @Getter - private Config props; + private Config rawConfig; @Getter private boolean active = true; @@ -50,8 +50,8 @@ public class BaseDataNode implements DataNode { if (nodeProps.hasPath(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY)) { this.active = nodeProps.getBoolean(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY); } - this.props = nodeProps; - } catch(Exception e) { + this.rawConfig = nodeProps; + } catch (Exception e) { throw new DataNodeCreationException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java index fc82cc1..56f6c1b 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java @@ -44,7 +44,10 @@ import org.apache.gobblin.util.ConfigUtils; @Alpha public class BaseFlowEdge implements FlowEdge { @Getter - protected List<String> endPoints; + protected String src; + + @Getter + protected String dest; @Getter protected FlowTemplate flowTemplate; @@ -53,7 +56,7 @@ public class BaseFlowEdge implements FlowEdge { private List<SpecExecutor> executors; @Getter - private Config props; + private Config config; @Getter private String id; @@ -63,11 +66,12 @@ public class BaseFlowEdge implements FlowEdge { //Constructor public BaseFlowEdge(List<String> endPoints, String edgeId, FlowTemplate flowTemplate, List<SpecExecutor> executors, Config properties, boolean active) { - this.endPoints = endPoints; + this.src = endPoints.get(0); + this.dest = endPoints.get(1); this.flowTemplate = flowTemplate; this.executors = executors; this.active = active; - this.props = properties; + this.config = properties; this.id = edgeId; } @@ -91,7 +95,7 @@ public class BaseFlowEdge implements FlowEdge { FlowEdge that = (FlowEdge) o; - if (!(this.getEndPoints().get(0).equals(that.getEndPoints().get(0))) && ((this.getEndPoints().get(1)).equals(that.getEndPoints().get(1)))) { + if (!(this.getSrc().equals(that.getSrc())) && ((this.getDest()).equals(that.getDest()))) { return false; } @@ -140,14 +144,14 @@ public class BaseFlowEdge implements FlowEdge { specExecutorConfigList.add(edgeProps.getConfig(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i)); } - String flowTemplateUri = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY, ""); + String flowTemplateDirUri = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY, ""); //Perform basic validation Preconditions.checkArgument(endPoints.size() == 2, "A FlowEdge must have 2 end points"); Preconditions .checkArgument(specExecutorConfigList.size() > 0, "A FlowEdge must have at least one SpecExecutor"); Preconditions - .checkArgument(!Strings.isNullOrEmpty(flowTemplateUri), "FlowTemplate URI must be not null or empty"); + .checkArgument(!Strings.isNullOrEmpty(flowTemplateDirUri), "FlowTemplate URI must be not null or empty"); boolean isActive = ConfigUtils.getBoolean(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY, true); //Build SpecExecutor from config @@ -158,7 +162,7 @@ public class BaseFlowEdge implements FlowEdge { SpecExecutor executor = (SpecExecutor) GobblinConstructorUtils.invokeLongestConstructor(executorClass, specExecutorConfig); specExecutors.add(executor); } - FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateUri)); + FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateDirUri)); return new BaseFlowEdge(endPoints, edgeId, flowTemplate, specExecutors, edgeProps, isActive); } catch (RuntimeException e) { throw e; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java index 783f7ea..edf40cc 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java @@ -75,8 +75,8 @@ public class BaseFlowGraph implements FlowGraph { */ @Override public synchronized boolean addFlowEdge(FlowEdge edge) { - String srcNode = edge.getEndPoints().get(0); - String dstNode = edge.getEndPoints().get(1); + String srcNode = edge.getSrc(); + String dstNode = edge.getDest(); if(!dataNodeMap.containsKey(srcNode) || !dataNodeMap.containsKey(dstNode)) { return false; } @@ -153,10 +153,10 @@ public class BaseFlowGraph implements FlowGraph { * if the {@link FlowEdge} is not in the graph, return false. */ public synchronized boolean deleteFlowEdge(FlowEdge edge) { - if(!dataNodeMap.containsKey(edge.getEndPoints().get(0))) { + if(!dataNodeMap.containsKey(edge.getSrc())) { return false; } - DataNode node = dataNodeMap.get(edge.getEndPoints().get(0)); + DataNode node = dataNodeMap.get(edge.getSrc()); if(!nodesToEdges.get(node).contains(edge)) { return false; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java index 8ae0027..58bbb81 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java @@ -87,6 +87,10 @@ public class Dag<T> { return node.parentNodes; } + public boolean isEmpty() { + return this.nodes.isEmpty(); + } + /** * Concatenate two dags together. Join the "other" dag to "this" dag and return "this" dag. * The concatenate method ensures that all the jobs of "this" dag (which may have multiple end nodes) @@ -97,9 +101,12 @@ public class Dag<T> { * @return the concatenated dag */ public Dag<T> concatenate(Dag<T> other) throws IOException { - if (other == null) { + if (other == null || other.isEmpty()) { return this; } + if (this.isEmpty()) { + return other; + } for (DagNode node : this.endNodes) { this.parentChildMap.put(node, Lists.newArrayList()); for (DagNode otherNode : other.startNodes) { @@ -108,6 +115,7 @@ public class Dag<T> { } this.endNodes = other.endNodes; } + this.nodes.addAll(other.nodes); return this; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java index 4931685..b7a5274 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java @@ -20,7 +20,6 @@ package org.apache.gobblin.service.modules.flowgraph; import com.typesafe.config.Config; import org.apache.gobblin.annotation.Alpha; -import org.apache.gobblin.service.modules.dataset.DatasetDescriptor; /** @@ -37,7 +36,7 @@ public interface DataNode { * @return the attributes of a {@link DataNode}. It also includes properties for resolving a {@link org.apache.gobblin.runtime.api.JobTemplate} * e.g. "source.fs.uri" for an HDFS node, "jdbc.publisher.url" for JDBC node. */ - Config getProps(); + Config getRawConfig(); /** * @return true if the {@link DataNode} is active http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java index e98337d..23e20c8 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java @@ -22,7 +22,25 @@ package org.apache.gobblin.service.modules.flowgraph; */ public class DatasetDescriptorConfigKeys { //Gobblin Service Dataset Descriptor related keys + public static final String FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.input.dataset.descriptor"; + public static final String FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.output.dataset.descriptor"; + + public static final String FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.edge.input.dataset.descriptor"; + public static final String FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX = "gobblin.flow.edge.output.dataset.descriptor"; + + public static final String CLASS_KEY = "class"; + public static final String PLATFORM_KEY = "platform"; public static final String PATH_KEY = "path"; public static final String FORMAT_KEY = "format"; + public static final String CODEC_KEY = "codec"; public static final String DESCRIPTION_KEY = "description"; + + //Dataset encryption related keys + public static final String ENCYPTION_PREFIX = "encrypt"; + public static final String ENCRYPTION_ALGORITHM_KEY = "algorithm"; + public static final String ENCRYPTION_KEYSTORE_TYPE_KEY = "keystore_type"; + public static final String ENCRYPTION_KEYSTORE_ENCODING_KEY = "keystore_encoding"; + + public static final String DATASET_DESCRIPTOR_CONFIG_ANY = "any"; + } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java deleted file mode 100644 index 5899645..0000000 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FileSystemDataNode.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service.modules.flowgraph; - -import java.io.IOException; -import java.net.URI; - -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.typesafe.config.Config; - -import org.apache.gobblin.annotation.Alpha; -import org.apache.gobblin.util.ConfigUtils; - -import joptsimple.internal.Strings; -import lombok.Getter; - - -/** - * An abstract {@link FileSystemDataNode} implementation. In addition to the required properties of a {@link BaseDataNode}, an {@link FileSystemDataNode} - * must have a FS URI specified. Example implementations of {@link FileSystemDataNode} include {@link HdfsDataNode}, {@link LocalFSDataNode}. - */ -@Alpha -public abstract class FileSystemDataNode extends BaseDataNode { - public static final String FS_URI_KEY = "fs.uri"; - @Getter - private String fsUri; - - /** - * Constructor. An HDFS DataNode must have fs.uri property specified in addition to a node Id. - */ - public FileSystemDataNode(Config nodeProps) throws DataNodeCreationException { - super(nodeProps); - try { - this.fsUri = ConfigUtils.getString(nodeProps, FS_URI_KEY, ""); - Preconditions.checkArgument(!Strings.isNullOrEmpty(this.fsUri), "FS URI cannot be null or empty for an HDFSDataNode"); - URI uri = new URI(this.fsUri); - if(!isUriValid(uri)) { - throw new IOException("Invalid FS URI " + this.fsUri); - } - } catch(Exception e) { - throw new DataNodeCreationException(e); - } - } - - public abstract boolean isUriValid(URI fsUri); - /** - * Two HDFS DataNodes are the same if they have the same id and the same fsUri. - */ - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - FileSystemDataNode that = (FileSystemDataNode) o; - - return this.getId().equals(that.getId()) && fsUri.equals(that.getFsUri()); - } - - @Override - public int hashCode() { - return Joiner.on("-").join(this.getId(), this.fsUri).hashCode(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java index fb60d67..497bd5b 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdge.java @@ -41,9 +41,15 @@ import org.apache.gobblin.service.modules.template.FlowTemplate; public interface FlowEdge { /** * - * @return the {@link DataNode} ids that are the end points of the edge. + * @return the source {@link DataNode} id of the edge. */ - List<String> getEndPoints(); + String getSrc(); + + /** + * + * @return the destination {@link DataNode} id of the edge. + */ + String getDest(); /** * @@ -62,7 +68,7 @@ public interface FlowEdge { * is instantiated. It also includes properties needed for resolving a {@link org.apache.gobblin.runtime.api.JobTemplate}. * @return the properties of this edge as a {@link Config} object. */ - Config getProps(); + Config getConfig(); /** * A string uniquely identifying the edge. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java index 23f5793..b4aa7bf 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java @@ -32,6 +32,13 @@ import org.apache.gobblin.annotation.Alpha; public interface FlowGraph { /** + * Get a {@link DataNode} from the node identifier + * @param nodeId {@link DataNode} identifier. + * @return the {@link DataNode} object if the node is present in the {@link FlowGraph}. + */ + public DataNode getNode(String nodeId); + + /** * Add a {@link DataNode} to the {@link FlowGraph} * @param node {@link DataNode} to be added * @return true if {@link DataNode} is added to the {@link FlowGraph} successfully. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java index cd4876a..8a49ec0 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java @@ -22,7 +22,7 @@ public class FlowGraphConfigurationKeys { public static final String FLOW_EDGE_PREFIX = "flow.edge."; /** - * {@link DataNode} configuration keys. + * {@link DataNode} related configuration keys. */ public static final String DATA_NODE_CLASS = DATA_NODE_PREFIX + "class"; public static final String DEFAULT_DATA_NODE_CLASS = "org.apache.gobblin.service.modules.flowgraph.BaseDataNode"; @@ -30,7 +30,7 @@ public class FlowGraphConfigurationKeys { public static final String DATA_NODE_IS_ACTIVE_KEY = DATA_NODE_PREFIX + "isActive"; /** - * {@link FlowEdge} configuration keys. + * {@link FlowEdge} related configuration keys. */ public static final String FLOW_EDGE_FACTORY_CLASS = FLOW_EDGE_PREFIX + "factory.class"; public static final String DEFAULT_FLOW_EDGE_FACTORY_CLASS = "org.apache.gobblin.service.modules.flowgraph.BaseFlowEdge$Factory"; @@ -39,7 +39,7 @@ public class FlowGraphConfigurationKeys { public static final String FLOW_EDGE_ID_KEY = FLOW_EDGE_PREFIX + "id"; public static final String FLOW_EDGE_NAME_KEY = FLOW_EDGE_PREFIX + "name"; public static final String FLOW_EDGE_IS_ACTIVE_KEY = FLOW_EDGE_PREFIX + "isActive"; - public static final String FLOW_EDGE_TEMPLATE_URI_KEY = FLOW_EDGE_PREFIX + "flowTemplateUri"; + public static final String FLOW_EDGE_TEMPLATE_DIR_URI_KEY = FLOW_EDGE_PREFIX + "flowTemplateDirUri"; public static final String FLOW_EDGE_SPEC_EXECUTORS_KEY = FLOW_EDGE_PREFIX + "specExecutors"; - public static final String FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY = "specExecutorClass"; + public static final String FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY = "specExecInstance.class"; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java deleted file mode 100644 index 7bcc18d..0000000 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/HdfsDataNode.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service.modules.flowgraph; - -import java.net.URI; - -import com.typesafe.config.Config; - -import org.apache.gobblin.annotation.Alpha; - -import joptsimple.internal.Strings; - - -/** - * An implementation of {@link HdfsDataNode}. All the properties specific to a HDFS based data node (e.g. fs.uri) - * are validated here. - */ -@Alpha -public class HdfsDataNode extends FileSystemDataNode { - public static final String HDFS_SCHEME = "hdfs"; - - public HdfsDataNode(Config nodeProps) throws DataNodeCreationException { - super(nodeProps); - } - - /** - * - * @param fsUri FileSystem URI - * @return true if the scheme is "hdfs" and authority is not empty. - */ - @Override - public boolean isUriValid(URI fsUri) { - String scheme = fsUri.getScheme(); - //Check that the scheme is "hdfs" - if(!scheme.equals(HDFS_SCHEME)) { - return false; - } - //Ensure that the authority is not empty - if(Strings.isNullOrEmpty(fsUri.getAuthority())) { - return false; - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java deleted file mode 100644 index 6dc1aa3..0000000 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/LocalFSDataNode.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service.modules.flowgraph; - -import java.net.URI; - -import org.apache.gobblin.annotation.Alpha; - -import com.typesafe.config.Config; - -/** - * An implementation of {@link LocalFSDataNode}. All the properties specific to a LocalFS based data node (e.g. fs.uri) - * are validated here. - */ -@Alpha -public class LocalFSDataNode extends FileSystemDataNode { - public static final String LOCAL_FS_SCHEME = "file"; - - public LocalFSDataNode(Config nodeProps) throws DataNodeCreationException { - super(nodeProps); - } - - /** - * - * @param fsUri FileSystem URI - * @return true if the scheme of fsUri equals "file" - */ - @Override - public boolean isUriValid(URI fsUri) { - String scheme = fsUri.getScheme(); - if(scheme.equals(LOCAL_FS_SCHEME)) { - return true; - } - return false; - } -}
