FALCON-1459 Ability to import from database. Contributed by Venkat Ramachandran
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/89040a29 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/89040a29 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/89040a29 Branch: refs/heads/master Commit: 89040a296de3d4a9bd0aa2232342438add37afee Parents: 35006fe Author: Sowmya Ramesh <[email protected]> Authored: Wed Oct 28 18:08:31 2015 -0700 Committer: Sowmya Ramesh <[email protected]> Committed: Wed Oct 28 18:08:31 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../main/java/org/apache/falcon/LifeCycle.java | 3 +- client/src/main/java/org/apache/falcon/Tag.java | 2 +- .../org/apache/falcon/entity/v0/EntityType.java | 8 +- .../falcon/metadata/RelationshipType.java | 2 + client/src/main/resources/datasource-0.1.xsd | 256 ++++++++++++++++++ client/src/main/resources/feed-0.1.xsd | 93 ++++++- client/src/main/resources/jaxb-binding.xjb | 9 + client/src/main/resources/mysql_database.xml | 46 ++++ .../apache/falcon/entity/DatasourceHelper.java | 199 ++++++++++++++ .../org/apache/falcon/entity/EntityUtil.java | 21 +- .../org/apache/falcon/entity/FeedHelper.java | 191 +++++++++++++- .../entity/parser/DatasourceEntityParser.java | 127 +++++++++ .../entity/parser/EntityParserFactory.java | 2 + .../falcon/entity/parser/FeedEntityParser.java | 64 ++++- .../falcon/entity/store/ConfigurationStore.java | 2 +- .../apache/falcon/entity/v0/EntityGraph.java | 11 + .../entity/v0/EntityIntegrityChecker.java | 3 + .../EntityRelationshipGraphBuilder.java | 33 +++ .../InstanceRelationshipGraphBuilder.java | 33 +++ .../falcon/metadata/MetadataMappingService.java | 9 +- .../falcon/metadata/RelationshipLabel.java | 1 + .../org/apache/falcon/util/HdfsClassLoader.java | 159 +++++++++++ .../falcon/workflow/WorkflowExecutionArgs.java | 1 + .../workflow/WorkflowExecutionContext.java | 6 +- .../apache/falcon/entity/AbstractTestBase.java | 1 + .../apache/falcon/entity/EntityTypeTest.java | 3 + .../apache/falcon/entity/FeedHelperTest.java | 109 +++++++- .../parser/DatasourceEntityParserTest.java | 77 ++++++ .../entity/parser/FeedEntityParserTest.java | 159 ++++++++++- .../falcon/entity/v0/EntityGraphTest.java | 124 ++++++++- .../config/datasource/datasource-0.1.xml | 48 ++++ .../config/datasource/datasource-file-0.1.xml | 48 ++++ .../datasource/datasource-invalid-0.1.xml | 46 ++++ .../resources/config/feed/feed-import-0.1.xml | 74 ++++++ .../feed/feed-import-exclude-fields-0.1.xml | 74 ++++++ .../config/feed/feed-import-invalid-0.1.xml | 73 +++++ .../config/feed/feed-import-noargs-0.1.xml | 64 +++++ docs/src/site/twiki/EntitySpecification.twiki | 84 ++++++ docs/src/site/twiki/FalconCLI.twiki | 23 +- .../falcon/messaging/JMSMessageProducer.java | 3 +- .../oozie/DatabaseImportWorkflowBuilder.java | 174 ++++++++++++ .../oozie/FeedImportCoordinatorBuilder.java | 191 ++++++++++++++ .../falcon/oozie/ImportWorkflowBuilder.java | 84 ++++++ .../falcon/oozie/OozieCoordinatorBuilder.java | 3 + .../OozieOrchestrationWorkflowBuilder.java | 12 + .../feed/FSReplicationWorkflowBuilder.java | 3 +- .../falcon/oozie/feed/FeedBundleBuilder.java | 5 + .../feed/FeedRetentionWorkflowBuilder.java | 1 + .../feed/HCatReplicationWorkflowBuilder.java | 3 +- .../ProcessExecutionWorkflowBuilder.java | 2 + .../feed/import-sqoop-database-action.xml | 47 ++++ .../src/main/resources/action/post-process.xml | 2 + pom.xml | 3 + webapp/pom.xml | 3 + .../apache/falcon/lifecycle/FeedImportIT.java | 99 +++++++ .../org/apache/falcon/resource/TestContext.java | 3 + .../org/apache/falcon/util/HsqldbTestUtils.java | 263 +++++++++++++++++++ .../src/test/resources/datasource-template.xml | 46 ++++ webapp/src/test/resources/feed-template3.xml | 59 +++++ 60 files changed, 3253 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b1f30cf..b5980be 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,8 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1459 Ability to import from database(Venkat Ramachandran via Sowmya Ramesh) + FALCON-1213 Base framework of the native scheduler(Pallavi Rao) IMPROVEMENTS http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/java/org/apache/falcon/LifeCycle.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/LifeCycle.java b/client/src/main/java/org/apache/falcon/LifeCycle.java index 58a2a6c..d4d39e8 100644 --- a/client/src/main/java/org/apache/falcon/LifeCycle.java +++ b/client/src/main/java/org/apache/falcon/LifeCycle.java @@ -25,7 +25,8 @@ package org.apache.falcon; public enum LifeCycle { EXECUTION(Tag.DEFAULT), EVICTION(Tag.RETENTION), - REPLICATION(Tag.REPLICATION); + REPLICATION(Tag.REPLICATION), + IMPORT(Tag.IMPORT); private final Tag tag; http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/java/org/apache/falcon/Tag.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/Tag.java b/client/src/main/java/org/apache/falcon/Tag.java index beeb812..5027ac0 100644 --- a/client/src/main/java/org/apache/falcon/Tag.java +++ b/client/src/main/java/org/apache/falcon/Tag.java @@ -24,7 +24,7 @@ import org.apache.falcon.entity.v0.EntityType; * Tag to include in the entity type. */ public enum Tag { - DEFAULT(EntityType.PROCESS), RETENTION(EntityType.FEED), REPLICATION(EntityType.FEED); + DEFAULT(EntityType.PROCESS), RETENTION(EntityType.FEED), REPLICATION(EntityType.FEED), IMPORT(EntityType.FEED); private final EntityType entityType; http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java b/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java index 0657124..3d55547 100644 --- a/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java +++ b/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java @@ -21,6 +21,7 @@ package org.apache.falcon.entity.v0; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.entity.v0.datasource.Datasource; import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBException; @@ -38,7 +39,8 @@ import java.util.Arrays; public enum EntityType { FEED(Feed.class, "/feed-0.1.xsd", "name"), PROCESS(Process.class, "/process-0.1.xsd", "name"), - CLUSTER(Cluster.class, "/cluster-0.1.xsd", "name"); + CLUSTER(Cluster.class, "/cluster-0.1.xsd", "name"), + DATASOURCE(Datasource.class, "/datasource-0.1.xsd", "name"); //Fail unmarshalling of whole xml if unmarshalling of any element fails private static class EventHandler implements ValidationEventHandler { @@ -93,8 +95,10 @@ public enum EntityType { return unmarshaller; } + public boolean isSchedulable() { - return this != EntityType.CLUSTER; + // Cluster and Datasource are not schedulable like Feed and Process + return ((this != EntityType.CLUSTER) && (this != EntityType.DATASOURCE)); } @edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP"}) http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java b/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java index f034772..8e5f8ea 100644 --- a/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java +++ b/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java @@ -27,10 +27,12 @@ public enum RelationshipType { CLUSTER_ENTITY("cluster-entity"), FEED_ENTITY("feed-entity"), PROCESS_ENTITY("process-entity"), + DATASOURCE_ENTITY("datasource-entity"), // instance vertex types FEED_INSTANCE("feed-instance"), PROCESS_INSTANCE("process-instance"), + IMPORT_INSTANCE("import-instance"), // Misc vertex types USER("user"), http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/resources/datasource-0.1.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/datasource-0.1.xsd b/client/src/main/resources/datasource-0.1.xsd new file mode 100644 index 0000000..beb82cc --- /dev/null +++ b/client/src/main/resources/datasource-0.1.xsd @@ -0,0 +1,256 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" attributeFormDefault="unqualified" elementFormDefault="qualified" + targetNamespace="uri:falcon:datasource:0.1" xmlns="uri:falcon:datasource:0.1" + xmlns:jaxb="http://java.sun.com/xml/ns/jaxb" jaxb:version="2.1"> + <xs:annotation> + <xs:documentation> + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version + 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied. + See the License for the specific language governing permissions and + limitations under the License. + </xs:documentation> + <xs:appinfo> + <jaxb:schemaBindings> + <jaxb:package name="org.apache.falcon.entity.v0.datasource"/> + </jaxb:schemaBindings> + </xs:appinfo> + </xs:annotation> + <xs:element name="datasource" type="datasource"> + </xs:element> + <xs:complexType name="datasource"> + <xs:annotation> + <xs:documentation>The datasource contains various information required + to connect to a data source like a MySQL datasource or Kafka cluster. + A datasource is referenced by feeds that represent an object like + Table (or Topic) in the MySQL database (or Kafka Cluster). + name: the name of datasource, which must be unique. + colo: the name of the colo to which this datasource belongs to. + </xs:documentation> + </xs:annotation> + <xs:sequence> + <xs:element type="KEY_VALUE_PAIR" name="tags" minOccurs="0" maxOccurs="1"> + <xs:annotation> + <xs:documentation> + tags: a process specifies an optional list of comma separated tags, + Key Value Pairs, separated by comma, + which is used for classification of datasource entity. + Example: [email protected], [email protected], department=forecasting + </xs:documentation> + </xs:annotation> + </xs:element> + <xs:element type="interfaces" name="interfaces"/> + <xs:element type="driver" name="driver" minOccurs="1" maxOccurs="1" /> + <xs:element type="properties" name="properties" minOccurs="0"/> + <xs:element type="ACL" name="ACL" minOccurs="0" maxOccurs="1"/> + </xs:sequence> + <xs:attribute type="IDENTIFIER" name="name" use="required"/> + <xs:attribute type="xs:string" name="colo" use="required"/> + <xs:attribute type="xs:string" name="description"/> + <xs:attribute type="datasource-type" name="type" use="required"> + <xs:annotation> + <xs:documentation> + datasource type could be Relational Databases (MySQL, Oracle etc.), Messgaing systems like + Kafka, etc. + </xs:documentation> + </xs:annotation> + </xs:attribute> + </xs:complexType> + <xs:complexType name="property"> + <xs:annotation> + <xs:documentation> + A key-value pair to pass in any datasource specific properties. + </xs:documentation> + </xs:annotation> + <xs:attribute type="xs:string" name="name" use="required"/> + <xs:attribute type="xs:string" name="value" use="required"/> + </xs:complexType> + <xs:complexType name="interface"> + <xs:annotation> + <xs:documentation> + An interface specifies the interface type (read or write), and an + endpoint url. Falcon uses these endpoints to import or export + data from datasources. + </xs:documentation> + </xs:annotation> + <xs:sequence> + <xs:element type="driver" name="driver" minOccurs="0" maxOccurs="1" /> + <xs:element type="credential" name="credential" minOccurs="0" maxOccurs="1"/> + <xs:element type="properties" name="properties" minOccurs="0"/> + </xs:sequence> + <xs:attribute type="interfacetype" name="type" use="required"/> + <xs:attribute type="xs:string" name="endpoint" use="required"/> + </xs:complexType> + <xs:complexType name="properties"> + <xs:annotation> + <xs:documentation> + A list of property elements. + </xs:documentation> + </xs:annotation> + <xs:sequence> + <xs:element type="property" name="property" maxOccurs="unbounded" minOccurs="0"/> + </xs:sequence> + </xs:complexType> + <xs:complexType name="interfaces"> + <xs:annotation> + <xs:documentation> + A list of interfaces. + </xs:documentation> + </xs:annotation> + <xs:sequence> + <xs:element type="interface" name="interface" maxOccurs="2" minOccurs="1"/> + <xs:element type="credential" name="credential" minOccurs="0" maxOccurs="1"/> + </xs:sequence> + </xs:complexType> + <xs:simpleType name="interfacetype"> + <xs:annotation> + <xs:documentation> + An interface for datasource has 2 different interface types: readonly, write. + The readonly endpoint specifies the url/mechanism to use for data IMPORT operation + from a datasource while write endpoint specifies the url/mechanism to use for data + EXPORT operatrion. + </xs:documentation> + </xs:annotation> + <xs:restriction base="xs:string"> + <xs:enumeration value="readonly"/> + <xs:enumeration value="write"/> + </xs:restriction> + </xs:simpleType> + <xs:simpleType name="IDENTIFIER"> + <xs:restriction base="xs:string"> + <xs:pattern value="(([a-zA-Z]([\-a-zA-Z0-9])*){1,39})"/> + </xs:restriction> + </xs:simpleType> + <xs:simpleType name="KEY_VALUE_PAIR"> + <xs:restriction base="xs:string"> + <xs:pattern value="([\w_]+=[^,]+)?([,]?[ ]*[\w_]+=[^,]+)*"/> + </xs:restriction> + </xs:simpleType> + <xs:complexType name="credential"> + <xs:sequence minOccurs="1" maxOccurs="1" > + <xs:element name="userName" minOccurs="1" maxOccurs="1" type="xs:string"> + <xs:annotation> + <xs:documentation> + The User for the datasource. + </xs:documentation> + </xs:annotation> + </xs:element> + + <xs:choice minOccurs="1" maxOccurs="1"> + <xs:element name="passwordFile" type="xs:string"> + <xs:annotation> + <xs:documentation> + The FQ path to a file on HDFS containing the datasource + server password with 400 permissions. Only the user + submitting the job has read access to this file which + will be securely passed to the mappers. + </xs:documentation> + </xs:annotation> + </xs:element> + + <xs:element name="passwordText" type="xs:string"> + <xs:annotation> + <xs:documentation> + Plain text password. + </xs:documentation> + </xs:annotation> + </xs:element> + </xs:choice> + </xs:sequence> + <xs:attribute name="type" type="credentialtype" use="required"/> + </xs:complexType> + + <xs:simpleType name="credentialtype"> + <xs:annotation> + <xs:documentation> + user-password credentials are supported today which can be extended. + </xs:documentation> + </xs:annotation> + <xs:restriction base="xs:string"> + <xs:enumeration value="password-file" /> + <xs:enumeration value="password-text" /> + </xs:restriction> + </xs:simpleType> + + <xs:simpleType name="datasource-type"> + <xs:annotation> + <xs:documentation> + The datasource type can be MySQL, ORACLE, Teradata etc. + </xs:documentation> + </xs:annotation> + <xs:restriction base="xs:string"> + <xs:enumeration value="mysql"/> + <xs:enumeration value="oracle"/> + <xs:enumeration value="hsql"/> + </xs:restriction> + </xs:simpleType> + + <xs:complexType name="driver"> + <xs:annotation> + <xs:documentation> + Driver information. + </xs:documentation> + </xs:annotation> + <xs:sequence minOccurs="1" maxOccurs="1"> + <xs:element type="xs:string" name="clazz" minOccurs="1" maxOccurs="1"> + <xs:annotation> + <xs:documentation> + Fully qualified class name for the datasource driver used + for validating the datasource connection in Falcon. + </xs:documentation> + </xs:annotation> + </xs:element> + <xs:element type="xs:string" name="jar" minOccurs="1" maxOccurs="unbounded"> + <xs:annotation> + <xs:documentation> + Path to the connector jar files on HDFS thats shipped with the workflow. + You'd need to put the connector jar files in oozie sharelib and since this + is using all the latest features in sqoop 1.x, requires 1.5 snapshot. + </xs:documentation> + </xs:annotation> + </xs:element> + </xs:sequence> + </xs:complexType> + <xs:complexType name="ACL"> + <xs:annotation> + <xs:documentation> + Access control list for this cluster. + owner is the Owner of this entity. + group is the one which has access to read - not used at this time. + permission is not enforced at this time + </xs:documentation> + </xs:annotation> + <xs:attribute type="xs:string" name="owner"/> + <xs:attribute type="xs:string" name="group"/> + <xs:attribute type="xs:string" name="permission" default="*"/> + </xs:complexType> +</xs:schema> http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/resources/feed-0.1.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/feed-0.1.xsd b/client/src/main/resources/feed-0.1.xsd index 77b8f4b..2974dd6 100644 --- a/client/src/main/resources/feed-0.1.xsd +++ b/client/src/main/resources/feed-0.1.xsd @@ -130,7 +130,6 @@ <xs:attribute type="IDENTIFIER" name="name" use="required"/> <xs:attribute type="xs:string" name="description"/> </xs:complexType> - <xs:complexType name="cluster"> <xs:annotation> <xs:documentation> @@ -157,6 +156,7 @@ <xs:element type="validity" name="validity"/> <xs:element type="retention" name="retention"/> <xs:element type="sla" name="sla" minOccurs="0" maxOccurs="1"/> + <xs:element type="import" name="import" minOccurs="0" maxOccurs="1"/> <xs:choice minOccurs="0" maxOccurs="1"> <xs:element type="locations" name="locations" minOccurs="0"/> <xs:element type="catalog-table" name="table"/> @@ -166,8 +166,7 @@ <xs:attribute type="IDENTIFIER" name="name" use="required"/> <xs:attribute type="cluster-type" name="type" use="optional"/> <xs:attribute type="xs:string" name="partition" use="optional"/> - <xs:attribute type="frequency-type" name="delay" use="optional" /> - + <xs:attribute type="frequency-type" name="delay" use="optional" /> </xs:complexType> <xs:complexType name="partitions"> <xs:annotation> @@ -301,7 +300,6 @@ <xs:complexType name="partition"> <xs:attribute type="IDENTIFIER" name="name" use="required"/> </xs:complexType> - <xs:complexType name="notification"> <xs:annotation> <xs:documentation> @@ -331,7 +329,6 @@ </xs:attribute> <xs:attribute type="xs:string" name="to" use="required"/> </xs:complexType> - <xs:complexType name="ACL"> <xs:annotation> <xs:documentation> @@ -451,7 +448,90 @@ <xs:minLength value="1"/> </xs:restriction> </xs:simpleType> - + <xs:complexType name="import"> + <xs:sequence> + <xs:element type="source" name="source"/> + <xs:element type="arguments" name="arguments" minOccurs="0"/> + </xs:sequence> + </xs:complexType> + <xs:complexType name="source"> + <xs:annotation> + <xs:documentation> + Specifies the source entity name from which data will be imported. + This can be Database or other data source types in the future. + Table name specifies the table to import. + Extract type specifies a extraction method (full or incremental). + DeltaColumn specifies the column name on source databbase table + to identify the new data since the last extraction. + Merge type specifies how the data will be organized on Hadoop. + The supported types are snapshot (as in a particular time) or append + (as in timeseries partitions). + </xs:documentation> + </xs:annotation> + <xs:sequence> + <xs:element type="extract" name="extract" minOccurs="1"/> + <xs:element type="fields-type" name="fields" minOccurs="0"/> + </xs:sequence> + <xs:attribute type="non-empty-string" name="name" use="required"/> + <xs:attribute type="non-empty-string" name="tableName" use="required"/> + </xs:complexType> + <xs:complexType name="extract"> + <xs:sequence> + <xs:element type="xs:string" name="deltacolumn" minOccurs="0" maxOccurs="1"/> + <xs:element type="merge-type" name="mergepolicy" minOccurs="1" maxOccurs="1"/> + </xs:sequence> + <xs:attribute type="extract-method" name="type" use="required"/> + </xs:complexType> + <xs:simpleType name="extract-method"> + <xs:restriction base="xs:string"> + <xs:enumeration value="full"/> + <xs:enumeration value="incremental"/> + </xs:restriction> + </xs:simpleType> + <xs:simpleType name="merge-type"> + <xs:restriction base="xs:string"> + <xs:enumeration value="snapshot"/> + <xs:enumeration value="append"/> + </xs:restriction> + </xs:simpleType> + <xs:complexType name="fields-type"> + <xs:annotation> + <xs:documentation> + Specifies either an include or exclude fields list. If include field list is specified, only + the specified fields will be imported. If exclude field list is specified, all fields except + the ones specified will be imported from datasource to HDFS. + </xs:documentation> + </xs:annotation> + <xs:choice minOccurs="1" maxOccurs="1"> + <xs:element type="field-include-exclude" name="includes"/> + <xs:element type="field-include-exclude" name="excludes"/> + </xs:choice> + </xs:complexType> + <xs:complexType name="field-include-exclude"> + <xs:sequence> + <xs:element type="xs:string" name="field" maxOccurs="unbounded" minOccurs="1"/> + </xs:sequence> + </xs:complexType> + <xs:complexType name="arguments"> + <xs:annotation> + <xs:documentation> + A list of name-value pair of extra arguments to be passed to the concrete implementation. + </xs:documentation> + </xs:annotation> + <xs:sequence> + <xs:element type="argument" name="argument" maxOccurs="unbounded" minOccurs="0"/> + </xs:sequence> + </xs:complexType> + <xs:complexType name="argument"> + <xs:annotation> + <xs:documentation> + A key-value pair, which are used while invoking + ingestion engines. + </xs:documentation> + </xs:annotation> + <xs:attribute type="xs:string" name="name" use="required"/> + <xs:attribute type="xs:string" name="value" use="required"/> + </xs:complexType> <xs:complexType name="retention-stage"> <xs:annotation> <xs:documentation> @@ -469,5 +549,4 @@ <xs:element type="properties" name="properties" minOccurs="0" maxOccurs="1"></xs:element> </xs:all> </xs:complexType> - </xs:schema> http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/resources/jaxb-binding.xjb ---------------------------------------------------------------------- diff --git a/client/src/main/resources/jaxb-binding.xjb b/client/src/main/resources/jaxb-binding.xjb index 6f1d6c7..978145f 100644 --- a/client/src/main/resources/jaxb-binding.xjb +++ b/client/src/main/resources/jaxb-binding.xjb @@ -56,6 +56,15 @@ <inheritance:extends>org.apache.falcon.entity.v0.EntityNotification</inheritance:extends> </jaxb:bindings> + + <jaxb:bindings schemaLocation="datasource-0.1.xsd" node="//xs:complexType[@name='datasource']"> + <inheritance:extends>org.apache.falcon.entity.v0.Entity</inheritance:extends> + </jaxb:bindings> + + <jaxb:bindings schemaLocation="datasource-0.1.xsd" node="//xs:complexType[@name='ACL']"> + <inheritance:extends>org.apache.falcon.entity.v0.AccessControlList</inheritance:extends> + </jaxb:bindings> + <jaxb:globalBindings> <xjc:simple/> </jaxb:globalBindings> http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/resources/mysql_database.xml ---------------------------------------------------------------------- diff --git a/client/src/main/resources/mysql_database.xml b/client/src/main/resources/mysql_database.xml new file mode 100644 index 0000000..5f88ba4 --- /dev/null +++ b/client/src/main/resources/mysql_database.xml @@ -0,0 +1,46 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<database colo="west-coast" description="MySQL database on west coast" type="mysql" name="mysql-db" xmlns="uri:falcon:database:0.1"> + <tags>[email protected], [email protected]</tags> + <interfaces> + + <!-- ***** read interface ***** --> + <interface type="readonly" endpoint="jdbc:mysql://c6402/test"> + <credential type="password-file"> + <userName>sqoop_user</userName> + <passwordFile>/user/ambari-qa/password-store/password_read_user</passwordFile> + </credential> + </interface> + + <!-- ***** write interface ***** --> + <interface type="write" endpoint="jdbc:mysql://c6402/test"> + <credential type="password-file"> + <userName>sqoop2_user</userName> + <passwordFile>/user/ambari-qa/password-store/password_write_user</passwordFile> + </credential> + </interface> + + <!-- ***** default credential ***** --> + <credential type="password-file"> + <userName>sqoop2_user</userName> + <passwordFile>/user/ambari-qa/password-store/password_write_user</passwordFile> + </credential> + + </interfaces> +</database> http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java b/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java new file mode 100644 index 0000000..f9b3966 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.entity; + +import org.apache.commons.io.IOUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.Pair; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.datasource.Credential; +import org.apache.falcon.entity.v0.datasource.Credentialtype; +import org.apache.falcon.entity.v0.datasource.Datasource; +import org.apache.falcon.entity.v0.datasource.DatasourceType; +import org.apache.falcon.entity.v0.datasource.Interface; +import org.apache.falcon.entity.v0.datasource.Interfaces; +import org.apache.falcon.entity.v0.datasource.Interfacetype; +import org.apache.falcon.entity.v0.feed.Cluster; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; + +/** + * DataSource entity helper methods. + */ + +public final class DatasourceHelper { + + private static final Logger LOG = LoggerFactory.getLogger(DatasourceHelper.class); + + private static final ConfigurationStore STORE = ConfigurationStore.get(); + + public static DatasourceType getImportSourceType(Cluster feedCluster) throws FalconException { + Datasource ds = STORE.get(EntityType.DATASOURCE, feedCluster.getImport().getSource().getName()); + return ds.getType(); + } + + private DatasourceHelper() {} + + public static Datasource getDatasource(Cluster feedCluster) throws FalconException { + return STORE.get(EntityType.DATASOURCE, feedCluster.getImport().getSource().getName()); + } + public static String getReadOnlyEndpoint(Datasource db) { + return getInterface(db, Interfacetype.READONLY); + } + + /** + * Returns user name and password pair as it is specified in the XML. If the credential type is + * password-file, the path name is returned. + * + * @param db + * @return user name and password pair + * @throws FalconException + */ + public static Pair<String, String> getReadPasswordInfo(Datasource db) throws FalconException { + for (Interface ifs : db.getInterfaces().getInterfaces()) { + if ((ifs.getType() == Interfacetype.READONLY) && (ifs.getCredential() != null)) { + return getPasswordInfo(ifs.getCredential()); + } + } + return getDefaultPasswordInfo(db.getInterfaces()); + } + + /** + * Returns user name and actual password pair. If the credential type is password-file, then the + * password is read from the HDFS file. If the credential type is password-text, the clear text + * password is returned. + * + * @param db + * @return + * @throws FalconException + */ + public static java.util.Properties fetchReadPasswordInfo(Datasource db) throws FalconException { + Pair<String, String> passwdInfo = getReadPasswordInfo(db); + java.util.Properties p = new java.util.Properties(); + p.put("user", passwdInfo.first); + p.put("password", passwdInfo.second); + if (getReadPasswordType(db) == Credentialtype.PASSWORD_FILE) { + String actualPasswd = readPasswordInfoFromFile(passwdInfo.second); + p.put("password", actualPasswd); + } + return p; + } + + /** + * Given Datasource, return the read-only credential type. If read-only credential is missing, + * use interface's default credential. + * + * @param db + * @return Credentialtype + * @throws FalconException + */ + public static Credentialtype getReadPasswordType(Datasource db) throws FalconException { + for (Interface ifs : db.getInterfaces().getInterfaces()) { + if ((ifs.getType() == Interfacetype.READONLY) && (ifs.getCredential() != null)) { + return getPasswordType(ifs.getCredential()); + } + } + return getDefaultPasswordType(db.getInterfaces()); + } + + /** + * Return the Interface endpoint for the interface type specified in the argument. + * + * @param db + * @param type - can be read-only or write + * @return + */ + private static String getInterface(Datasource db, Interfacetype type) { + for(Interface ifs : db.getInterfaces().getInterfaces()) { + if (ifs.getType() == type) { + return ifs.getEndpoint(); + } + } + return null; + } + private static Credentialtype getPasswordType(Credential c) { + return c.getType(); + } + + private static Credentialtype getDefaultPasswordType(Interfaces ifs) throws FalconException { + + if (ifs.getCredential() != null) { + return ifs.getCredential().getType(); + } else { + throw new FalconException("Missing Interfaces default credential"); + } + } + + private static Pair<String, String> getDefaultPasswordInfo(Interfaces ifs) throws FalconException { + + if (ifs.getCredential() != null) { + return getPasswordInfo(ifs.getCredential()); + } else { + throw new FalconException("Missing Interfaces default credential"); + } + } + + private static Pair<String, String> getPasswordInfo(Credential c) throws FalconException { + String passwd = null; + if (c.getType() == Credentialtype.PASSWORD_FILE) { + passwd = c.getPasswordFile(); + } else { + passwd = c.getPasswordText(); + } + return new Pair<String, String>(c.getUserName(), passwd); + } + + private static String readPasswordInfoFromFile(String passwordFilePath) throws FalconException { + try { + Path path = new Path(passwordFilePath); + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(path.toUri()); + if (!fs.exists(path)) { + throw new IOException("The password file does not exist! " + + passwordFilePath); + } + + if (!fs.isFile(path)) { + throw new IOException("The password file cannot be a directory! " + + passwordFilePath); + } + + InputStream is = fs.open(path); + StringWriter writer = new StringWriter(); + try { + IOUtils.copy(is, writer); + return writer.toString(); + } finally { + IOUtils.closeQuietly(is); + IOUtils.closeQuietly(writer); + fs.close(); + } + } catch (IOException ioe) { + LOG.error("Error reading password file from HDFS : " + ioe); + throw new FalconException(ioe); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/EntityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java index ceefb17..66dba6f 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java @@ -35,6 +35,7 @@ import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.cluster.ClusterLocationType; +import org.apache.falcon.entity.v0.datasource.DatasourceType; import org.apache.falcon.entity.v0.cluster.Property; import org.apache.falcon.entity.v0.feed.ClusterType; import org.apache.falcon.entity.v0.feed.Feed; @@ -681,7 +682,7 @@ public final class EntityUtil { //Staging path that stores scheduler configs like oozie coord/bundle xmls, parent workflow xml //Each entity update creates a new staging path //Base staging path is the base path for all staging dirs - public static Path getBaseStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) { + public static Path getBaseStagingPath(Cluster cluster, Entity entity) { return new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING).getPath(), "falcon/workflows/" + entity.getEntityType().name().toLowerCase() + "/" + entity.getName()); } @@ -723,7 +724,7 @@ public final class EntityUtil { //Creates new staging path for entity schedule/update //Staging path containd md5 of the cluster view of the entity. This is required to check if update is required - public static Path getNewStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) + public static Path getNewStagingPath(Cluster cluster, Entity entity) throws FalconException { Entity clusterView = getClusterView(entity, cluster.getName()); return new Path(getBaseStagingPath(cluster, entity), @@ -778,7 +779,7 @@ public final class EntityUtil { } } - public static Path getLogPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) { + public static Path getLogPath(Cluster cluster, Entity entity) { return new Path(getBaseStagingPath(cluster, entity), "logs"); } @@ -1001,6 +1002,20 @@ public final class EntityUtil { return result; } + /** + * Returns Data Source Type given a feed with Import policy. + * + * @param cluster + * @param feed + * @return + * @throws FalconException + */ + + public static DatasourceType getImportDatasourceType( + Cluster cluster, Feed feed) throws FalconException { + return FeedHelper.getImportDatasourceType(cluster, feed); + } + public static EntityNotification getEntityNotification(Entity entity) { switch (entity.getEntityType()) { case FEED: http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/FeedHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java index 5c252a8..2c65eba 100644 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java @@ -26,14 +26,18 @@ import org.apache.falcon.entity.common.FeedDataPath; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.cluster.Property; +import org.apache.falcon.entity.v0.datasource.DatasourceType; +import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.CatalogTable; import org.apache.falcon.entity.v0.feed.Cluster; -import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.ClusterType; +import org.apache.falcon.entity.v0.feed.ExtractMethod; +import org.apache.falcon.entity.v0.feed.FieldIncludeExclude; import org.apache.falcon.entity.v0.feed.Lifecycle; import org.apache.falcon.entity.v0.feed.Location; -import org.apache.falcon.entity.v0.feed.LocationType; import org.apache.falcon.entity.v0.feed.Locations; +import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.entity.v0.feed.MergeType; import org.apache.falcon.entity.v0.feed.RetentionStage; import org.apache.falcon.entity.v0.feed.Sla; import org.apache.falcon.entity.v0.process.Input; @@ -301,7 +305,7 @@ public final class FeedHelper { clusterVars.put("colo", cluster.getColo()); clusterVars.put("name", cluster.getName()); if (cluster.getProperties() != null) { - for (Property property : cluster.getProperties().getProperties()) { + for (org.apache.falcon.entity.v0.cluster.Property property : cluster.getProperties().getProperties()) { clusterVars.put(property.getName(), property.getValue()); } } @@ -786,6 +790,184 @@ public final class FeedHelper { return result; } + + /** + * Returns the data source type associated with the Feed's import policy. + * + * @param clusterEntity + * @param feed + * @return {@link org.apache.falcon.entity.v0.datasource.DatasourceType} + * @throws FalconException + */ + public static DatasourceType getImportDatasourceType( + org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, + Feed feed) throws FalconException { + Cluster feedCluster = getCluster(feed, clusterEntity.getName()); + return DatasourceHelper.getImportSourceType(feedCluster); + } + + /** + * Return if Import policy is enabled in the Feed definition. + * + * @param feedCluster + * @return true if import policy is enabled else false + */ + + public static boolean isImportEnabled(org.apache.falcon.entity.v0.feed.Cluster feedCluster) { + if (feedCluster.getType() == ClusterType.SOURCE) { + return (feedCluster.getImport() != null); + } + return false; + } + + /** + * Returns the data source name associated with the Feed's import policy. + * + * @param feedCluster + * @return DataSource name defined in the Datasource Entity + */ + public static String getImportDatasourceName(org.apache.falcon.entity.v0.feed.Cluster feedCluster) { + if (isImportEnabled(feedCluster)) { + return feedCluster.getImport().getSource().getName(); + } else { + return null; + } + } + + /** + * Returns Datasource table name. + * + * @param feedCluster + * @return Table or Topic name of the Datasource + */ + + public static String getImportDataSourceTableName(org.apache.falcon.entity.v0.feed.Cluster feedCluster) { + if (isImportEnabled(feedCluster)) { + return feedCluster.getImport().getSource().getTableName(); + } else { + return null; + } + } + + /** + * Returns the extract method type. + * + * @param feedCluster + * @return {@link org.apache.falcon.entity.v0.feed.ExtractMethod} + */ + + public static ExtractMethod getImportExtractMethod(org.apache.falcon.entity.v0.feed.Cluster feedCluster) { + if (isImportEnabled(feedCluster)) { + return feedCluster.getImport().getSource().getExtract().getType(); + } else { + return null; + } + } + + /** + * Returns the merge type of the Feed import policy. + * + * @param feedCluster + * @return {@link org.apache.falcon.entity.v0.feed.MergeType} + */ + public static MergeType getImportMergeType(org.apache.falcon.entity.v0.feed.Cluster feedCluster) { + if (isImportEnabled(feedCluster)) { + return feedCluster.getImport().getSource().getExtract().getMergepolicy(); + } else { + return null; + } + } + + /** + * Returns the initial instance date for the import data set or coorinator. + * + * For snapshot merge type, a latest time will be used since the source data is dumped in whole. + * For incremental merge type, start date specified in the cluster validity will be used. + * + * @param feedCluster + * @return Feed cluster validity start date or recent time + */ + public static Date getImportInitalInstance(org.apache.falcon.entity.v0.feed.Cluster feedCluster) { + Date initialInstance = new Date(); + if (!FeedHelper.isSnapshotMergeType(feedCluster)) { + initialInstance = feedCluster.getValidity().getStart(); + } + return initialInstance; + } + + /** + * Helper method to check if the merge type is snapshot. + * + * @param feedCluster + * @return true if the feed import policy merge type is snapshot + * + */ + public static boolean isSnapshotMergeType(org.apache.falcon.entity.v0.feed.Cluster feedCluster) { + return MergeType.SNAPSHOT == getImportMergeType(feedCluster); + } + + /** + * Returns extra arguments specified in the Feed import policy. + * + * @param feedCluster + * @return + * @throws FalconException + */ + public static Map<String, String> getImportArguments(org.apache.falcon.entity.v0.feed.Cluster feedCluster) + throws FalconException { + + Map<String, String> argsMap = new HashMap<String, String>(); + if (feedCluster.getImport().getArguments() == null) { + return argsMap; + } + + for(org.apache.falcon.entity.v0.feed.Argument p : feedCluster.getImport().getArguments().getArguments()) { + argsMap.put(p.getName().toLowerCase(), p.getValue()); + } + return argsMap; + } + + /** + * Returns Fields list specified in the Import Policy. + * + * @param feedCluster + * @return List of String + * @throws FalconException + */ + public static List<String> getFieldList(org.apache.falcon.entity.v0.feed.Cluster feedCluster) + throws FalconException { + if (feedCluster.getImport().getSource().getFields() == null) { + return null; + } + org.apache.falcon.entity.v0.feed.FieldsType fieldType = feedCluster.getImport().getSource().getFields(); + FieldIncludeExclude includeFileds = fieldType.getIncludes(); + if (includeFileds == null) { + return null; + } + return includeFileds.getFields(); + } + + + /** + * Returns true if exclude field lists are used. This is a TBD feature. + * + * @param feedCluster + * @return true of exclude field list is used or false. + * @throws FalconException + */ + + public static boolean isFieldExcludes(org.apache.falcon.entity.v0.feed.Cluster feedCluster) + throws FalconException { + if (feedCluster.getImport().getSource().getFields() != null) { + org.apache.falcon.entity.v0.feed.FieldsType fieldType = feedCluster.getImport().getSource().getFields(); + FieldIncludeExclude excludeFileds = fieldType.getExcludes(); + if ((excludeFileds != null) && (excludeFileds.getFields().size() > 0)) { + return true; + } + } + return false; + } + public static FeedInstanceStatus.AvailabilityStatus getFeedInstanceStatus(Feed feed, String clusterName, Date instanceTime) throws FalconException { @@ -813,5 +995,4 @@ public final class FeedHelper { } return retentionFrequency; } - } http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java new file mode 100644 index 0000000..e58b1e9 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.entity.parser; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.DatasourceHelper; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.datasource.ACL; +import org.apache.falcon.entity.v0.datasource.Datasource; +import org.apache.falcon.entity.v0.datasource.Interfacetype; +import org.apache.falcon.util.HdfsClassLoader; +import org.apache.hadoop.security.authorize.AuthorizationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.Connection; +import java.util.Arrays; +import java.util.Properties; + +/** + * Parser for DataSource entity definition. + */ + +public class DatasourceEntityParser extends EntityParser<Datasource> { + + private static final Logger LOG = LoggerFactory.getLogger(DatasourceEntityParser.class); + + public DatasourceEntityParser() { + super(EntityType.DATASOURCE); + } + + @Override + public void validate(Datasource db) throws FalconException { + ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader(); + try { + ClassLoader hdfsClassLoader = HdfsClassLoader.load(db.getName(), db.getDriver().getJars()); + Thread.currentThread().setContextClassLoader(hdfsClassLoader); + validateInterface(db, Interfacetype.READONLY, hdfsClassLoader); + validateInterface(db, Interfacetype.WRITE, hdfsClassLoader); + validateACL(db); + } catch(IOException io) { + throw new ValidationException("Unable to copy driver jars to local dir: " + + Arrays.toString(db.getDriver().getJars().toArray())); + } finally { + Thread.currentThread().setContextClassLoader(previousClassLoader); + } + } + + private static void validateInterface(Datasource db, Interfacetype interfacetype, ClassLoader hdfsClassLoader) + throws ValidationException { + String endpoint = null; + try { + endpoint = DatasourceHelper.getReadOnlyEndpoint(db); + if (StringUtils.isNotBlank(endpoint)) { + LOG.info("Validating {0} endpoint {1} connection.", interfacetype.value(), endpoint); + Properties userPasswdInfo = DatasourceHelper.fetchReadPasswordInfo(db); + validateConnection(hdfsClassLoader, db.getDriver().getClazz(), endpoint, userPasswdInfo); + } + } catch(FalconException fe) { + throw new ValidationException(String.format("Cannot validate '%s' " + + "interface '%s' " + "of database entity '%s' due to '%s' ", + interfacetype, endpoint, + db.getName(), fe.getMessage())); + } + } + + private static void validateConnection(ClassLoader hdfsClassLoader, String driverClass, + String connectUrl, Properties userPasswdInfo) + throws FalconException { + try { + java.sql.Driver driver = (java.sql.Driver) hdfsClassLoader.loadClass(driverClass).newInstance(); + LOG.info("Validating connection URL: {0} using driver: {1}", connectUrl, driver.getClass().toString()); + Connection con = driver.connect(connectUrl, userPasswdInfo); + if (con == null) { + throw new FalconException("DriverManager.getConnection() return " + + "null for URL : " + connectUrl); + } + } catch (Exception ex) { + LOG.error("Exception while validating connection : ", ex); + throw new FalconException(ex); + } + } + + /** + * Validate ACL if authorization is enabled. + * + * @param db database entity + * @throws ValidationException + */ + private void validateACL(Datasource db) throws ValidationException { + if (isAuthorizationDisabled) { + return; + } + + // Validate the entity owner is logged-in, authenticated user if authorization is enabled + final ACL dbACL = db.getACL(); + if (dbACL == null) { + throw new ValidationException("Datasource ACL cannot be empty for: " + db.getName()); + } + + validateACLOwnerAndGroup(dbACL); + + try { + authorize(db.getName(), dbACL); + } catch (AuthorizationException e) { + throw new ValidationException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java b/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java index 5a33201..b497770 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java @@ -45,6 +45,8 @@ public final class EntityParserFactory { return new FeedEntityParser(); case CLUSTER: return new ClusterEntityParser(); + case DATASOURCE: + return new DatasourceEntityParser(); default: throw new IllegalArgumentException("Unhandled entity type: " + entityType); } http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java index c5cfdd2..c70f18d 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java @@ -33,11 +33,14 @@ import org.apache.falcon.entity.v0.EntityGraph; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.feed.ACL; +import org.apache.falcon.entity.v0.feed.Extract; +import org.apache.falcon.entity.v0.feed.ExtractMethod; +import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.Cluster; import org.apache.falcon.entity.v0.feed.ClusterType; -import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.Location; import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.entity.v0.feed.MergeType; import org.apache.falcon.entity.v0.feed.Properties; import org.apache.falcon.entity.v0.feed.Property; import org.apache.falcon.entity.v0.feed.Sla; @@ -55,8 +58,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; -import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.HashSet; import java.util.Set; import java.util.TimeZone; @@ -95,6 +99,12 @@ public class FeedEntityParser extends EntityParser<Feed> { cluster.getName()); validateClusterHasRegistry(feed, cluster); validateFeedCutOffPeriod(feed, cluster); + if (FeedHelper.isImportEnabled(cluster)) { + validateEntityExists(EntityType.DATASOURCE, FeedHelper.getImportDatasourceName(cluster)); + validateFeedExtractionType(feed, cluster); + validateFeedImportArgs(cluster); + validateFeedImportFieldExcludes(cluster); + } } validateFeedStorage(feed); @@ -553,4 +563,54 @@ public class FeedEntityParser extends EntityParser<Feed> { } } + + /** + * Validate extraction and merge type combination. Currently supported combo: + * + * ExtractionType = FULL and MergeType = SNAPSHOT. + * ExtractionType = INCREMENTAL and MergeType = APPEND. + * + * @param feed Feed entity + * @param cluster Cluster referenced in the Feed definition + * @throws FalconException + */ + + private void validateFeedExtractionType(Feed feed, Cluster cluster) throws FalconException { + Extract extract = cluster.getImport().getSource().getExtract(); + + if (ExtractMethod.FULL == extract.getType()) { + if ((MergeType.SNAPSHOT != extract.getMergepolicy()) + || (extract.getDeltacolumn() != null)) { + throw new ValidationException(String.format("Feed %s is using FULL " + + "extract method but specifies either a superfluous " + + "deltacolumn or a mergepolicy other than snapshot", feed.getName())); + } + } else { + throw new ValidationException(String.format("Feed %s is using unsupported " + + "extraction mechanism %s", feed.getName(), extract.getType().value())); + } + } + + /** + * Validate improt arguments. + * @param feedCluster Cluster referenced in the feed + */ + private void validateFeedImportArgs(Cluster feedCluster) throws FalconException { + Map<String, String> args = FeedHelper.getImportArguments(feedCluster); + int numMappers = 1; + if (args.containsKey("--num-mappers")) { + numMappers = Integer.parseInt(args.get("--num-mappers")); + } + if ((numMappers > 1) && (!args.containsKey("--split-by"))) { + throw new ValidationException(String.format("Feed import expects " + + "--split-by column when --num-mappers > 1")); + } + } + + private void validateFeedImportFieldExcludes(Cluster feedCluster) throws FalconException { + if (FeedHelper.isFieldExcludes(feedCluster)) { + throw new ValidationException(String.format("Field excludes are not supported " + + "currently in Feed import policy")); + } + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java index 4dd1c68..9c7a932 100644 --- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java +++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java @@ -59,7 +59,7 @@ import java.util.concurrent.TimeUnit; public final class ConfigurationStore implements FalconService { private static final EntityType[] ENTITY_LOAD_ORDER = new EntityType[] { - EntityType.CLUSTER, EntityType.FEED, EntityType.PROCESS, }; + EntityType.CLUSTER, EntityType.FEED, EntityType.PROCESS, EntityType.DATASOURCE, }; public static final EntityType[] ENTITY_DELETE_ORDER = new EntityType[] { EntityType.PROCESS, EntityType.FEED, EntityType.CLUSTER, }; http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java index bd4c6cf..e4d9385 100644 --- a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java +++ b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java @@ -19,6 +19,7 @@ package org.apache.falcon.entity.v0; import org.apache.falcon.FalconException; +import org.apache.falcon.entity.FeedHelper; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Cluster; @@ -189,6 +190,16 @@ public final class EntityGraph implements ConfigurationChangeListener { Set<Node> clusterEdges = nodeEdges.get(clusterNode); feedEdges.add(clusterNode); clusterEdges.add(feedNode); + + if (FeedHelper.isImportEnabled(cluster)) { + Node dbNode = new Node(EntityType.DATASOURCE, FeedHelper.getImportDatasourceName(cluster)); + if (!nodeEdges.containsKey(dbNode)) { + nodeEdges.put(dbNode, new HashSet<Node>()); + } + Set<Node> dbEdges = nodeEdges.get(dbNode); + feedEdges.add(dbNode); + dbEdges.add(feedNode); + } } return nodeEdges; } http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java index bd32852..4c7e913 100644 --- a/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java +++ b/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java @@ -46,6 +46,9 @@ public final class EntityIntegrityChecker { case FEED: return filter(deps, EntityType.PROCESS); + case DATASOURCE: + return filter(deps, EntityType.FEED); + default: return null; } http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java index 8c3876c..25bbf0c 100644 --- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java +++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java @@ -20,10 +20,12 @@ package org.apache.falcon.metadata; import com.tinkerpop.blueprints.Graph; import com.tinkerpop.blueprints.Vertex; +import org.apache.falcon.entity.FeedHelper; import org.apache.falcon.entity.ProcessHelper; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.datasource.Datasource; import org.apache.falcon.entity.v0.feed.ClusterType; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Input; @@ -64,6 +66,10 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder { case FEED: addFeedEntity((Feed) entity); break; + case DATASOURCE: + addDatasourceEntity((Datasource) entity); + break; + default: throw new IllegalArgumentException("Invalid EntityType " + entityType); } @@ -91,8 +97,25 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder { addRelationToCluster(feedVertex, feedCluster.getName(), RelationshipLabel.FEED_CLUSTER_EDGE); } } + + for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) { + if (FeedHelper.isImportEnabled(feedCluster)) { + addRelationToDatasource(feedVertex, FeedHelper.getImportDatasourceName(feedCluster), + RelationshipLabel.DATASOURCE_IMPORT_EDGE); + } + } + } + + public void addDatasourceEntity(Datasource dsEntity) { + LOG.info("Adding datasource entity: {}", dsEntity.getName()); + Vertex dsVertex = addVertex(dsEntity.getName(), RelationshipType.DATASOURCE_ENTITY); + + addUserRelation(dsVertex); + addColoRelation(dsEntity.getColo(), dsVertex); + addDataClassification(dsEntity.getTags(), dsVertex); } + public void updateEntity(Entity oldEntity, Entity newEntity) { EntityType entityType = oldEntity.getEntityType(); switch (entityType) { @@ -177,6 +200,16 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder { addEdge(fromVertex, clusterVertex, edgeLabel.getName()); } + public void addRelationToDatasource(Vertex fromVertex, String datasourceName, RelationshipLabel edgeLabel) { + Vertex clusterVertex = findVertex(datasourceName, RelationshipType.DATASOURCE_ENTITY); + if (clusterVertex == null) { // cluster must exist before adding other entities + LOG.error("Illegal State: Datasource entity vertex must exist for {}", datasourceName); + throw new IllegalStateException("Datasource entity vertex must exist: " + datasourceName); + } + + addEdge(fromVertex, clusterVertex, edgeLabel.getName()); + } + public void addInputFeeds(Inputs inputs, Vertex processVertex) { if (inputs == null) { return; http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java index f485764..b709857 100644 --- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java +++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java @@ -267,6 +267,39 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder { } } + + public void addImportedInstance(WorkflowExecutionContext context) throws FalconException { + + String feedName = context.getOutputFeedNames(); + String feedInstanceDataPath = context.getOutputFeedInstancePaths(); + String datasourceName = context.getDatasourceName(); + String sourceClusterName = context.getSrcClusterName(); + + LOG.info("Computing import feed instance for : name= {} path= {}, in cluster: {} " + + "from datasource: {}", feedName, + feedInstanceDataPath, sourceClusterName, datasourceName); + String feedInstanceName = getFeedInstanceName(feedName, sourceClusterName, + feedInstanceDataPath, context.getNominalTimeAsISO8601()); + Vertex feedInstanceVertex = findVertex(feedInstanceName, RelationshipType.FEED_INSTANCE); + + LOG.info("Vertex exists? name={}, type={}, v={}", + feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex); + if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon + LOG.info("{} instance vertex {} does not exist, add it", + RelationshipType.FEED_INSTANCE, feedInstanceName); + feedInstanceVertex = addFeedInstance(// add a new instance + feedInstanceName, context, feedName, context.getSrcClusterName()); + } + addInstanceToEntity(feedInstanceVertex, datasourceName, RelationshipType.DATASOURCE_ENTITY, + RelationshipLabel.DATASOURCE_IMPORT_EDGE, context.getTimeStampAsISO8601()); + addInstanceToEntity(feedInstanceVertex, sourceClusterName, RelationshipType.CLUSTER_ENTITY, + RelationshipLabel.FEED_CLUSTER_EDGE, context.getTimeStampAsISO8601()); + } + + public String getImportInstanceName(WorkflowExecutionContext context) { + return context.getEntityName() + "/" + context.getNominalTimeAsISO8601(); + } + private void addFeedInstance(Vertex processInstance, RelationshipLabel edgeLabel, WorkflowExecutionContext context, String feedName, String feedInstanceDataPath) throws FalconException { http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java index 56fbde0..cf2b651 100644 --- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java +++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java @@ -287,8 +287,11 @@ public class MetadataMappingService case DELETE: onFeedInstanceEvicted(context); break; + case IMPORT: + onFeedInstanceImported(context); + break; default: - throw new IllegalArgumentException("Invalid EntityOperation" + entityOperation); + throw new IllegalArgumentException("Invalid EntityOperation - " + entityOperation); } } @@ -328,4 +331,8 @@ public class MetadataMappingService LOG.info("Adding evicted feed instance: {}", context.getNominalTimeAsISO8601()); instanceGraphBuilder.addEvictedInstance(context); } + private void onFeedInstanceImported(WorkflowExecutionContext context) throws FalconException { + LOG.info("Adding imported feed instance: {}", context.getNominalTimeAsISO8601()); + instanceGraphBuilder.addImportedInstance(context); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java index 5b312da..6d4bf46 100644 --- a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java +++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java @@ -28,6 +28,7 @@ public enum RelationshipLabel { PROCESS_CLUSTER_EDGE("runs-on"), FEED_PROCESS_EDGE("input"), PROCESS_FEED_EDGE("output"), + DATASOURCE_IMPORT_EDGE("import"), // instance edge labels INSTANCE_ENTITY_EDGE("instance-of"), http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/util/HdfsClassLoader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/HdfsClassLoader.java b/common/src/main/java/org/apache/falcon/util/HdfsClassLoader.java new file mode 100644 index 0000000..3f9091f --- /dev/null +++ b/common/src/main/java/org/apache/falcon/util/HdfsClassLoader.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.util; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Helper class loader that fetches jars from HDFS location and loads into JVM. + */ + +public class HdfsClassLoader extends URLClassLoader { + + private static final Logger LOG = LoggerFactory.getLogger(HdfsClassLoader.class); + private static Map<String, HdfsClassLoader> classLoaderCache = new ConcurrentHashMap<String, HdfsClassLoader>(); + private static final Object LOCK = new Object(); + + public static ClassLoader load(final String name, final List<String> jarHdfsPath) throws IOException { + LOG.info("ClassLoader cache size = " + classLoaderCache.size()); + if (classLoaderCache.containsKey(name)) { + return classLoaderCache.get(name); + } + + synchronized (LOCK) { + LOG.info("Copying jar files from HDFS to local dir"); + final URL[] urls = copyHdfsJarFilesToTempDir(name, jarHdfsPath); + final ClassLoader parentClassLoader = HdfsClassLoader.class.getClassLoader(); + LOG.info("Creating a new HdfsClassLoader for name = {0} with parent = {1} using classpath = {2}", + name, parentClassLoader.toString(), Arrays.toString(jarHdfsPath.toArray())); + HdfsClassLoader hdfsClassLoader = java.security.AccessController.doPrivileged( + new java.security.PrivilegedAction<HdfsClassLoader>() { + @Override + public HdfsClassLoader run() { + return new HdfsClassLoader(name, urls, parentClassLoader); + } + } + ); + classLoaderCache.put(name, hdfsClassLoader); + return hdfsClassLoader; + } + } + + private final ClassLoader realParent; + + public HdfsClassLoader(String name, URL[] urls, ClassLoader parentClassLoader) { + // set the 'parent' member to null giving an option for this class loader + super(urls, null); + this.realParent = parentClassLoader; + } + + @Override + protected Class<?> loadClass(String name, boolean resolve) + throws ClassNotFoundException { + + // Load through the parent class loader first and then fallback to this class loader. + try { + return realParent.loadClass(name); + } catch (Throwable t) { + return super.loadClass(name, resolve); + } + } + + @Override + public URL getResource(String name) { + // This is the same as the jdk's getResource except the parent + // is taken from the realParent member instead of the parent member. + URL url = realParent.getResource(name); + if (url == null) { + url = findResource(name); + } + return url; + } + + private static URL[] copyHdfsJarFilesToTempDir(String databaseName, List<String> jars) throws IOException { + List<URL> urls = new ArrayList<URL>(); + + final Configuration conf = new Configuration(); + Path localPath = createTempDir(databaseName, conf); + + for (String jar : jars) { + Path jarPath = new Path(jar); + final FileSystem fs = jarPath.getFileSystem(conf); + if (fs.isFile(jarPath) && jarPath.getName().endsWith(".jar")) { + LOG.info("Copying jarFile = " + jarPath); + fs.copyToLocalFile(jarPath, localPath); + } + } + urls.addAll(getJarsInPath(localPath.toUri().toURL())); + + return urls.toArray(new URL[urls.size()]); + } + + private static Path createTempDir(String databaseName, Configuration conf) throws IOException { + String tmpBaseDir = String.format("file://%s", System.getProperty("java.io.tmpdir")); + if (StringUtils.isBlank(tmpBaseDir)) { + tmpBaseDir = "file:///tmp"; + } + Path localPath = new Path(tmpBaseDir, databaseName); + localPath.getFileSystem(conf).mkdirs(localPath); + return localPath; + } + + private static List<URL> getJarsInPath(URL fileURL) throws MalformedURLException { + List<URL> urls = new ArrayList<URL>(); + + File file = new File(fileURL.getPath()); + if (file.isDirectory()) { + File[] jarFiles = file.listFiles(new FileFilter() { + @Override + public boolean accept(File file) { + return file.isFile() && file.getName().endsWith(".jar"); + } + }); + + for (File jarFile : jarFiles) { + urls.add(jarFile.toURI().toURL()); + } + + if (!fileURL.toString().endsWith("/")) { + fileURL = new URL(fileURL.toString() + "/"); + } + } + + urls.add(fileURL); + return urls; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java index ac7140c..915e8c2 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java @@ -38,6 +38,7 @@ public enum WorkflowExecutionArgs { // Exactly same as the above. Introduced to ensure compatibility between messages produced by POST-PROCESSING and // the values in conf. DATA_OPERATION("falconDataOperation", "operation like generate, delete, replicate", false), + DATASOURCE_NAME("datasource", "name of the datasource", false), // who WORKFLOW_USER("workflowUser", "user who owns the feed instance (partition)"), http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java index 9bfc51b..899165b 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java @@ -47,6 +47,7 @@ import java.util.Map; import java.util.Set; import java.util.TimeZone; + /** * Captures the workflow execution context. */ @@ -74,7 +75,7 @@ public class WorkflowExecutionContext { * Entity operations supported. */ public enum EntityOperations { - GENERATE, DELETE, ARCHIVE, REPLICATE, CHMOD + GENERATE, DELETE, ARCHIVE, REPLICATE, CHMOD, IMPORT } public static final WorkflowExecutionArgs[] USER_MESSAGE_ARGS = { @@ -299,9 +300,12 @@ public class WorkflowExecutionContext { } public long getExecutionCompletionTime() { + return creationTime; } + public String getDatasourceName() { return getValue(WorkflowExecutionArgs.DATASOURCE_NAME); } + public long getWorkflowStartTime() { return Long.parseLong(getValue(WorkflowExecutionArgs.WF_START_TIME)); } http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java index aab9cee..a6d607b 100644 --- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java +++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java @@ -58,6 +58,7 @@ public class AbstractTestBase { protected static final String FEED3_XML = "/config/feed/feed-0.3.xml"; protected static final String FEED4_XML = "/config/feed/feed-0.4.xml"; protected static final String CLUSTER_XML = "/config/cluster/cluster-0.1.xml"; + protected static final String DATASOURCE_XML = "/config/datasource/datasource-0.1.xml"; protected EmbeddedCluster dfsCluster; protected Configuration conf = new Configuration(); private ConfigurationStore store; http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/java/org/apache/falcon/entity/EntityTypeTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/EntityTypeTest.java b/common/src/test/java/org/apache/falcon/entity/EntityTypeTest.java index 640e87d..5a4d6ec 100644 --- a/common/src/test/java/org/apache/falcon/entity/EntityTypeTest.java +++ b/common/src/test/java/org/apache/falcon/entity/EntityTypeTest.java @@ -38,6 +38,7 @@ public class EntityTypeTest { Assert.assertTrue(EntityType.PROCESS.isSchedulable()); Assert.assertTrue(EntityType.FEED.isSchedulable()); Assert.assertFalse(EntityType.CLUSTER.isSchedulable()); + Assert.assertFalse(EntityType.DATASOURCE.isSchedulable()); } @Test @@ -48,6 +49,8 @@ public class EntityTypeTest { Assert.assertEquals(EntityType.CLUSTER, EntityType.getEnum("cluSTER")); Assert.assertEquals(EntityType.PROCESS, EntityType.getEnum("process")); Assert.assertEquals(EntityType.PROCESS, EntityType.getEnum("pRocess")); + Assert.assertEquals(EntityType.DATASOURCE, EntityType.getEnum("datasource")); + Assert.assertEquals(EntityType.DATASOURCE, EntityType.getEnum("dataSource")); } @Test(expectedExceptions = IllegalArgumentException.class)
