Repository: falcon Updated Branches: refs/heads/master e6526d829 -> 2b84dd56d
FALCON-1949 Update Data source Entity + Updates data source entity in the config store + Rewrites the corresponding IMPORT and EXPORT bundles with end and start dates for old and new bundles. + UT and IT + Manual end-to-end testing. Author: Venkatesan Ramachandran <[email protected]> Reviewers: "Balu Vellanki <[email protected]>, Venkat Ranganathan <[email protected]>, Ying Zheng <[email protected]>" Closes #157 from vramachan/FALCON-1949.UpdateDataSource Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2b84dd56 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2b84dd56 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2b84dd56 Branch: refs/heads/master Commit: 2b84dd56d291299f22ba872c1b0814cdad5692fe Parents: e6526d8 Author: Venkatesan Ramachandran <[email protected]> Authored: Mon Jun 6 17:58:44 2016 -0700 Committer: bvellanki <[email protected]> Committed: Mon Jun 6 17:58:44 2016 -0700 ---------------------------------------------------------------------- client/src/main/resources/feed-0.1.xsd | 1 + .../apache/falcon/entity/DatasourceHelper.java | 185 ++++++++++++++++++- .../org/apache/falcon/entity/FeedHelper.java | 2 - .../entity/parser/DatasourceEntityParser.java | 4 + .../falcon/entity/store/ConfigurationStore.java | 5 + .../apache/falcon/entity/v0/EntityGraph.java | 1 - .../EntityRelationshipGraphBuilder.java | 34 +++- .../falcon/metadata/RelationshipLabel.java | 1 + .../org/apache/falcon/update/UpdateHelper.java | 34 ++++ .../apache/falcon/entity/AbstractTestBase.java | 20 ++ .../entity/store/ConfigurationStoreTest.java | 51 ++++- .../apache/falcon/update/UpdateHelperTest.java | 43 +++++ .../falcon/resource/AbstractEntityManager.java | 122 ++++++++++-- .../falcon/lifecycle/DatasourceUpdateIT.java | 95 ++++++++++ .../org/apache/falcon/resource/TestContext.java | 1 + .../src/test/resources/datasource-template5.xml | 46 +++++ 16 files changed, 615 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/client/src/main/resources/feed-0.1.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/feed-0.1.xsd b/client/src/main/resources/feed-0.1.xsd index 3488233..cbc97b9 100644 --- a/client/src/main/resources/feed-0.1.xsd +++ b/client/src/main/resources/feed-0.1.xsd @@ -489,6 +489,7 @@ </xs:sequence> <xs:attribute type="non-empty-string" name="name" use="required"/> <xs:attribute type="non-empty-string" name="tableName" use="required"/> + <xs:attribute type="xs:int" name="version" use="optional" default="0"/> </xs:complexType> <xs:complexType name="extract"> <xs:sequence> http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java b/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java index 51ce898..e035bb5 100644 --- a/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java @@ -27,10 +27,12 @@ import org.apache.falcon.entity.v0.datasource.Credential; import org.apache.falcon.entity.v0.datasource.Credentialtype; import org.apache.falcon.entity.v0.datasource.Datasource; import org.apache.falcon.entity.v0.datasource.DatasourceType; +import org.apache.falcon.entity.v0.datasource.Driver; import org.apache.falcon.entity.v0.datasource.Interface; import org.apache.falcon.entity.v0.datasource.Interfaces; import org.apache.falcon.entity.v0.datasource.Interfacetype; import org.apache.falcon.entity.v0.datasource.PasswordAliasType; +import org.apache.falcon.entity.v0.datasource.Property; import org.apache.falcon.security.CurrentUser; import org.apache.hadoop.conf.Configuration; import org.apache.falcon.hadoop.HadoopClientFactory; @@ -46,6 +48,8 @@ import java.io.InputStream; import java.io.StringWriter; import java.net.URI; import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.Map; /** * DataSource entity helper methods. @@ -70,11 +74,11 @@ public final class DatasourceHelper { } public static String getReadOnlyEndpoint(Datasource datasource) { - return getInterface(datasource, Interfacetype.READONLY); + return getInterfaceEndpoint(datasource, Interfacetype.READONLY); } public static String getWriteEndpoint(Datasource datasource) { - return getInterface(datasource, Interfacetype.WRITE); + return getInterfaceEndpoint(datasource, Interfacetype.WRITE); } /** @@ -148,16 +152,158 @@ public final class DatasourceHelper { } /** + * checks if two datasource interfaces are same. + * + * @param oldEntity old datasource entity + * @param newEntity new datasource entity + * @param ifacetype type of interface + * @return true if same else false + */ + public static boolean isSameInterface(Datasource oldEntity, Datasource newEntity, Interfacetype ifacetype) { + LOG.debug("Verifying if Interfaces match for Datasource {} : Old - {}, New - {}", oldEntity, newEntity); + Interface oIface = getInterface(oldEntity, ifacetype); + Interface nIface = getInterface(newEntity, ifacetype); + if ((oIface == null) && (nIface == null)) { + return true; + } + if ((oIface == null) || (nIface == null)) { + return false; + } + + return (StringUtils.equals(oIface.getEndpoint(), nIface.getEndpoint()) + && isSameDriverClazz(oIface.getDriver(), nIface.getDriver()) + && isSameCredentials(oIface.getCredential(), nIface.getCredential())); + + } + + /** + * check if datasource driver is same. + * @param oldEntity + * @param newEntity + * @return true if same or false + */ + + public static boolean isSameDriverClazz(Driver oldEntity, Driver newEntity) { + if ((oldEntity == null) && (newEntity == null)) { + return true; + } + if ((oldEntity == null) || (newEntity == null)) { + return false; + } + return StringUtils.equals(oldEntity.getClazz(), newEntity.getClazz()); + } + + /** + * checks if data source properties are same. + * @param oldEntity + * @param newEntity + * @return true if same else false + */ + + public static boolean isSameProperties(Datasource oldEntity, Datasource newEntity) { + Map<String, String> oldProps = getDatasourceProperties(oldEntity); + Map<String, String> newProps = getDatasourceProperties(newEntity); + return oldProps.equals(newProps); + } + + /** + * checks if data source credentials are same. + * @param oCred + * @param nCred + * @return true true + */ + public static boolean isSameCredentials(Credential oCred, Credential nCred) { + if ((oCred == null) && (nCred == null)) { + return true; + } + if ((oCred == null) || (nCred == null)) { + return true; + } + if (StringUtils.equals(oCred.getUserName(), nCred.getUserName())) { + if (oCred.getType() == nCred.getType()) { + if (oCred.getType() == Credentialtype.PASSWORD_TEXT) { + return StringUtils.equals(oCred.getPasswordText(), nCred.getPasswordText()); + } else if (oCred.getType() == Credentialtype.PASSWORD_FILE) { + return StringUtils.equals(oCred.getPasswordFile(), nCred.getPasswordFile()); + } else if (oCred.getType() == Credentialtype.PASSWORD_ALIAS) { + return (StringUtils.equals(oCred.getPasswordAlias().getAlias(), + nCred.getPasswordAlias().getAlias()) + && StringUtils.equals(oCred.getPasswordAlias().getProviderPath(), + nCred.getPasswordAlias().getProviderPath())); + } + } else { + return false; + } + } + return false; + } + + public static Credential getCredential(Datasource db) { + return getCredential(db, null); + } + + public static Credential getCredential(Datasource db, Interfacetype interfaceType) { + if (interfaceType == null) { + return db.getInterfaces().getCredential(); + } else { + for(Interface iface : db.getInterfaces().getInterfaces()) { + if (iface.getType() == interfaceType) { + return iface.getCredential(); + } + } + } + return null; + } + + public static void validateCredential(Credential cred) throws FalconException { + if (cred == null) { + return; + } + switch (cred.getType()) { + case PASSWORD_TEXT: + if (StringUtils.isBlank(cred.getUserName()) || StringUtils.isBlank(cred.getPasswordText())) { + throw new FalconException(String.format("Credential type '%s' missing tags '%s' or '%s'", + cred.getType().value(), "userName", "passwordText")); + } + break; + case PASSWORD_FILE: + if (StringUtils.isBlank(cred.getUserName()) || StringUtils.isBlank(cred.getPasswordFile())) { + throw new FalconException(String.format("Credential type '%s' missing tags '%s' or '%s'", + cred.getType().value(), "userName", "passwordFile")); + } + break; + case PASSWORD_ALIAS: + if (StringUtils.isBlank(cred.getUserName()) || (cred.getPasswordAlias() == null) + || StringUtils.isBlank(cred.getPasswordAlias().getAlias()) + || StringUtils.isBlank(cred.getPasswordAlias().getProviderPath())) { + throw new FalconException(String.format("Credential type '%s' missing tags '%s' or '%s' or %s'", + cred.getType().value(), "userName", "alias", "providerPath")); + } + break; + default: + throw new FalconException(String.format("Unknown Credential type '%s'", cred.getType().value())); + } + } + + /** * Return the Interface endpoint for the interface type specified in the argument. * * @param db * @param type - can be read-only or write * @return */ - private static String getInterface(Datasource db, Interfacetype type) { + private static String getInterfaceEndpoint(Datasource db, Interfacetype type) { + if (getInterface(db, type) != null) { + return getInterface(db, type).getEndpoint(); + } else { + return null; + } + } + + private static Interface getInterface(Datasource db, Interfacetype type) { for(Interface ifs : db.getInterfaces().getInterfaces()) { if (ifs.getType() == type) { - return ifs.getEndpoint(); + return ifs; } } return null; @@ -172,6 +318,12 @@ public final class DatasourceHelper { } } + /** + * fetch password from the corresponding store. + * @param c + * @return actual password + * @throws FalconException + */ private static String fetchPasswordInfoFromCredentialStore(final PasswordAliasType c) throws FalconException { try { final String credPath = c.getProviderPath(); @@ -216,6 +368,15 @@ public final class DatasourceHelper { throw new FalconException(msg, ioe); } } + + /** + * fetch the password from file. + * + * @param passwordFilePath + * @return + * @throws FalconException + */ + private static String fetchPasswordInfoFromFile(String passwordFilePath) throws FalconException { try { Path path = new Path(passwordFilePath); @@ -245,4 +406,20 @@ public final class DatasourceHelper { throw new FalconException(ioe); } } + + + /* + returns data store properties + */ + + private static Map<String, String> getDatasourceProperties(final Datasource datasource) { + Map<String, String> returnProps = new HashMap<String, String>(); + if (datasource.getProperties() != null) { + for (Property prop : datasource.getProperties().getProperties()) { + returnProps.put(prop.getName(), prop.getValue()); + } + } + return returnProps; + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/main/java/org/apache/falcon/entity/FeedHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java index cca2d8b..ee6837e 100644 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java @@ -874,8 +874,6 @@ public final class FeedHelper { } } - - /** * Returns Datasource table name. * http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java index 998f952..358fbce 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java @@ -51,6 +51,10 @@ public class DatasourceEntityParser extends EntityParser<Datasource> { public void validate(Datasource db) throws FalconException { try { ClassLoader hdfsClassLoader = HdfsClassLoader.load(db.getName(), db.getDriver().getJars()); + DatasourceHelper.validateCredential(DatasourceHelper.getCredential(db)); + DatasourceHelper.validateCredential(DatasourceHelper.getCredential(db, Interfacetype.READONLY)); + DatasourceHelper.validateCredential(DatasourceHelper.getCredential(db, Interfacetype.WRITE)); + validateInterface(db, Interfacetype.READONLY, hdfsClassLoader); validateInterface(db, Interfacetype.WRITE, hdfsClassLoader); validateACL(db); http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java index 7f2b172..debf106 100644 --- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java +++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java @@ -25,6 +25,7 @@ import org.apache.falcon.entity.v0.AccessControlList; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.datasource.Datasource; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.service.ConfigurationChangeListener; import org.apache.falcon.service.FalconService; @@ -266,6 +267,10 @@ public final class ConfigurationStore implements FalconService { if (UpdateHelper.isClusterEntityUpdated((Cluster) oldentity, (Cluster) newEntity)) { EntityUtil.setVersion(newEntity, EntityUtil.getVersion(oldentity) + 1); } + } else if (oldentity.getEntityType().equals(EntityType.DATASOURCE)) { + if (UpdateHelper.isDatasourceEntityUpdated((Datasource) oldentity, (Datasource) newEntity)) { + EntityUtil.setVersion(newEntity, EntityUtil.getVersion(oldentity) + 1); + } } else if (!EntityUtil.equals(oldentity, newEntity)) { // Increase version for other entities if they actually changed. EntityUtil.setVersion(newEntity, EntityUtil.getVersion(oldentity)); http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java index acb570e..b5f6788 100644 --- a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java +++ b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java @@ -200,7 +200,6 @@ public final class EntityGraph implements ConfigurationChangeListener { feedEdges.add(dbNode); dbEdges.add(feedNode); } - if (FeedHelper.isExportEnabled(cluster)) { Node dbNode = new Node(EntityType.DATASOURCE, FeedHelper.getExportDatasourceName(cluster)); if (!nodeEdges.containsKey(dbNode)) { http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java index e6851df..af0a8b9 100644 --- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java +++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java @@ -80,7 +80,7 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder { Vertex clusterVertex = addVertex(clusterEntity.getName(), RelationshipType.CLUSTER_ENTITY); addUserRelation(clusterVertex); - addColoRelation(clusterEntity.getColo(), clusterVertex); + addColoRelation(clusterEntity.getColo(), clusterVertex, RelationshipLabel.CLUSTER_COLO); addDataClassification(clusterEntity.getTags(), clusterVertex); } @@ -111,7 +111,7 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder { Vertex dsVertex = addVertex(dsEntity.getName(), RelationshipType.DATASOURCE_ENTITY); addUserRelation(dsVertex); - addColoRelation(dsEntity.getColo(), dsVertex); + addColoRelation(dsEntity.getColo(), dsVertex, RelationshipLabel.DATASOURCE_COLO); addDataClassification(dsEntity.getTags(), dsVertex); } @@ -128,11 +128,27 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder { case FEED: updateFeedEntity((Feed) oldEntity, (Feed) newEntity); break; + case DATASOURCE: + updateDatasourceEntity((Datasource) oldEntity, (Datasource) newEntity); + break; default: throw new IllegalArgumentException("Invalid EntityType " + entityType); } } + private void updateDatasourceEntity(Datasource oldDatasource, Datasource newDatasouce) { + LOG.info("Updating Datasource entity: {}", newDatasouce.getName()); + + Vertex dsEntityVertex = findVertex(oldDatasource.getName(), RelationshipType.DATASOURCE_ENTITY); + if (dsEntityVertex == null) { + LOG.error("Illegal State: Datasource entity vertex must exist for {}", oldDatasource.getName()); + throw new IllegalStateException(oldDatasource.getName() + " entity vertex must exist."); + } + updateColoEdge(oldDatasource.getColo(), newDatasouce.getColo(), dsEntityVertex, + RelationshipLabel.DATASOURCE_COLO); + updateDataClassification(oldDatasource.getTags(), newDatasouce.getTags(), dsEntityVertex); + } + private void updateClusterEntity(Cluster oldCluster, Cluster newCluster) { LOG.info("Updating Cluster entity: {}", newCluster.getName()); Vertex clusterEntityVertex = findVertex(oldCluster.getName(), RelationshipType.CLUSTER_ENTITY); @@ -140,25 +156,27 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder { LOG.error("Illegal State: Cluster entity vertex must exist for {}", oldCluster.getName()); throw new IllegalStateException(oldCluster.getName() + " entity vertex must exist."); } - updateColoEdge(oldCluster.getColo(), newCluster.getColo(), clusterEntityVertex); + updateColoEdge(oldCluster.getColo(), newCluster.getColo(), clusterEntityVertex, + RelationshipLabel.CLUSTER_COLO); updateDataClassification(oldCluster.getTags(), newCluster.getTags(), clusterEntityVertex); } - private void updateColoEdge(String oldColo, String newColo, Vertex clusterEntityVertex) { + private void updateColoEdge(String oldColo, String newColo, Vertex clusterEntityVertex, + RelationshipLabel relLabel) { if (areSame(oldColo, newColo)) { return; } Vertex oldColoVertex = findVertex(oldColo, RelationshipType.COLO); if (oldColoVertex != null) { - removeEdge(clusterEntityVertex, oldColoVertex, RelationshipLabel.CLUSTER_COLO.getName()); + removeEdge(clusterEntityVertex, oldColoVertex, relLabel.getName()); } Vertex newColoVertex = findVertex(newColo, RelationshipType.COLO); if (newColoVertex == null) { newColoVertex = addVertex(newColo, RelationshipType.COLO); } - addEdge(clusterEntityVertex, newColoVertex, RelationshipLabel.CLUSTER_COLO.getName()); + addEdge(clusterEntityVertex, newColoVertex, relLabel.getName()); } public void updateFeedEntity(Feed oldFeed, Feed newFeed) { @@ -211,9 +229,9 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder { updateProcessOutputs(oldProcess.getOutputs(), newProcess.getOutputs(), processEntityVertex); } - public void addColoRelation(String colo, Vertex fromVertex) { + public void addColoRelation(String colo, Vertex fromVertex, RelationshipLabel relLabel) { Vertex coloVertex = addVertex(colo, RelationshipType.COLO); - addEdge(fromVertex, coloVertex, RelationshipLabel.CLUSTER_COLO.getName()); + addEdge(fromVertex, coloVertex, relLabel.getName()); } public void addRelationToCluster(Vertex fromVertex, String clusterName, RelationshipLabel edgeLabel) { http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java index a146957..e8ea0e4 100644 --- a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java +++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java @@ -39,6 +39,7 @@ public enum RelationshipLabel { USER("owned-by"), GROUPS("grouped-as"), PIPELINES("pipeline"), + DATASOURCE_COLO("datasource-colo"), // replication labels FEED_CLUSTER_REPLICATED_EDGE("replicated-to"), http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/main/java/org/apache/falcon/update/UpdateHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java index ae88a01..266319f 100644 --- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java +++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java @@ -21,6 +21,7 @@ package org.apache.falcon.update; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.entity.ClusterHelper; +import org.apache.falcon.entity.DatasourceHelper; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.FeedHelper; import org.apache.falcon.entity.ProcessHelper; @@ -29,6 +30,7 @@ import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.entity.v0.cluster.Interfacetype; +import org.apache.falcon.entity.v0.datasource.Datasource; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Cluster; import org.apache.falcon.entity.v0.process.Process; @@ -166,4 +168,36 @@ public final class UpdateHelper { return false; } + + public static boolean isDatasourceEntityUpdated(final Datasource oldEntity, final Datasource newEntity) + throws FalconException { + // ignore changes : colo, acl, description, tags + // can't change : name, data source entity type + + // major change that trigger bundle rewrite + // driver class name change but not driver jar as it is automatically picked up from share lib + + if (!DatasourceHelper.isSameDriverClazz(oldEntity.getDriver(), newEntity.getDriver())) { + return true; + } + + // interface endpoint, credential or driver update will trigger a bundle rewrite + for(org.apache.falcon.entity.v0.datasource.Interfacetype ifacetype + : org.apache.falcon.entity.v0.datasource.Interfacetype.values()) { + if (!DatasourceHelper.isSameInterface(oldEntity, newEntity, ifacetype)) { + return true; + } + } + // check default credential too + if (!DatasourceHelper.isSameCredentials(oldEntity.getInterfaces().getCredential(), + newEntity.getInterfaces().getCredential())) { + return true; + } + + // any change in the properties will trigger a bundle rewrite + if (!DatasourceHelper.isSameProperties(oldEntity, newEntity)) { + return true; + } + return false; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java index 3745955..3817056 100644 --- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java +++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java @@ -26,6 +26,7 @@ import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.cluster.Interfacetype; +import org.apache.falcon.entity.v0.datasource.Datasource; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.hadoop.HadoopClientFactory; @@ -136,6 +137,13 @@ public class AbstractTestBase { store.publish(type, process); break; + case DATASOURCE: + Datasource datasource = (Datasource) unmarshaller.unmarshal(this.getClass().getResource(DATASOURCE_XML)); + datasource.setName(name); + datasource.setVersion(0); + decorateACL(proxyUser, defaultGroupName, datasource); + store.publish(type, datasource); + break; default: } } @@ -158,6 +166,18 @@ public class AbstractTestBase { cluster.setACL(clusterACL); } + private void decorateACL(String proxyUser, String defaultGroupName, Datasource datasource) { + if (datasource.getACL() != null) { + return; + } + + org.apache.falcon.entity.v0.datasource.ACL dsACL = + new org.apache.falcon.entity.v0.datasource.ACL(); + dsACL.setOwner(proxyUser); + dsACL.setGroup(defaultGroupName); + datasource.setACL(dsACL); + } + private void decorateACL(String proxyUser, String defaultGroupName, Feed feed) { if (feed.getACL() != null) { return; http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java b/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java index 8056e80..0febca5 100644 --- a/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java +++ b/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java @@ -23,6 +23,13 @@ import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.datasource.Credential; +import org.apache.falcon.entity.v0.datasource.Credentialtype; +import org.apache.falcon.entity.v0.datasource.Datasource; +import org.apache.falcon.entity.v0.datasource.Interfaces; +import org.apache.falcon.entity.v0.datasource.Interface; +import org.apache.falcon.entity.v0.datasource.Interfacetype; +import org.apache.falcon.entity.v0.datasource.DatasourceType; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.service.ConfigurationChangeListener; import org.apache.falcon.util.StartupProperties; @@ -112,7 +119,7 @@ public class ConfigurationStoreTest { } @Test - public void testUpdate() throws Exception { + public void testClusterUpdate() throws Exception { Cluster cluster1 = createClusterObj(); store.publish(EntityType.CLUSTER, cluster1); Assert.assertEquals(EntityUtil.getVersion(store.get(EntityType.CLUSTER, "cluster1")).intValue(), 0); @@ -132,6 +139,28 @@ public class ConfigurationStoreTest { Assert.assertEquals(EntityUtil.getVersion(store.get(EntityType.CLUSTER, "cluster1")).intValue(), 1); } + @Test + public void testDatasourceUpdate() throws Exception { + Datasource ds1 = createDatasourceObj(); + store.publish(EntityType.DATASOURCE, ds1); + Assert.assertEquals(EntityUtil.getVersion(store.get(EntityType.DATASOURCE, "mysql-db")).intValue(), 0); + + Datasource ds3 = createDatasourceObj(); + ds3.setDescription("changed the description"); + store.initiateUpdate(ds3); + store.update(EntityType.DATASOURCE, ds3); + store.cleanupUpdateInit(); + Assert.assertEquals(EntityUtil.getVersion(store.get(EntityType.DATASOURCE, "mysql-db")).intValue(), 0); + + Datasource ds2 = createDatasourceObj(); + ds2.getInterfaces().getInterfaces().get(0).setEndpoint("jdbc:mysql://host-2/test"); + store.initiateUpdate(ds2); + store.update(EntityType.DATASOURCE, ds2); + store.cleanupUpdateInit(); + Assert.assertEquals(EntityUtil.getVersion(store.get(EntityType.DATASOURCE, "mysql-db")).intValue(), 1); + } + + private Cluster createClusterObj() { Cluster cluster = new Cluster(); cluster.setName("cluster1"); @@ -139,6 +168,26 @@ public class ConfigurationStoreTest { return cluster; } + private Datasource createDatasourceObj() { + Datasource datasource = new Datasource(); + datasource.setName("mysql-db"); + datasource.setColo("colo1"); + datasource.setDescription("mysql database"); + datasource.setType(DatasourceType.MYSQL); + Interface readInterface = new Interface(); + readInterface.setType(Interfacetype.READONLY); + readInterface.setEndpoint("jdbc:mysql://host-1/test"); + Credential cred = new Credential(); + cred.setUserName("test"); + cred.setType(Credentialtype.PASSWORD_TEXT); + cred.setPasswordText("password"); + readInterface.setCredential(cred); + datasource.setInterfaces(new Interfaces()); + datasource.getInterfaces().getInterfaces().add(readInterface); + datasource.setVersion(0); + return datasource; + } + @Test public void testGet() throws Exception { Process p = store.get(EntityType.PROCESS, "notfound"); http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java index 52b7103..826686f 100644 --- a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java +++ b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java @@ -28,6 +28,8 @@ import org.apache.falcon.entity.parser.EntityParserFactory; import org.apache.falcon.entity.parser.FeedEntityParser; import org.apache.falcon.entity.parser.ProcessEntityParser; import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.datasource.Datasource; +import org.apache.falcon.entity.v0.datasource.Credential; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.SchemaHelper; @@ -55,6 +57,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import javax.xml.bind.JAXBException; import javax.xml.bind.Unmarshaller; import java.io.IOException; import java.io.InputStream; @@ -87,6 +90,8 @@ public class UpdateHelperTest extends AbstractTestBase { storeEntity(EntityType.FEED, "impressionFeed"); storeEntity(EntityType.FEED, "imp-click-join1"); storeEntity(EntityType.FEED, "imp-click-join2"); + storeEntity(EntityType.DATASOURCE, "datasource1"); + storeEntity(EntityType.DATASOURCE, "datasource2"); } private void prepare(Process process) throws IOException, FalconException { @@ -365,6 +370,44 @@ public class UpdateHelperTest extends AbstractTestBase { Assert.assertTrue(UpdateHelper.isClusterEntityUpdated(clusterEntity, newClusterEntity)); } + @Test + public void testIsDatasourceEntityUpdated() throws Exception { + Unmarshaller unmarshaller = EntityType.DATASOURCE.getUnmarshaller(); + + String datasource = "datasource1"; + Datasource datasourceEntity = ConfigurationStore.get().get(EntityType.DATASOURCE, datasource); + Datasource newDatasourceEntity = getNewDatasource(unmarshaller, datasource); + Assert.assertNotNull(newDatasourceEntity); + + // Tags, ACL, description, colo update should not update bundle/workflow for dependent entities + org.apache.falcon.entity.v0.datasource.ACL acl = new org.apache.falcon.entity.v0.datasource.ACL(); + acl.setOwner("Test"); + acl.setGroup("testGroup"); + acl.setPermission("*"); + newDatasourceEntity.setACL(acl); + newDatasourceEntity.setDescription("New Description"); + newDatasourceEntity.setTags("test=val,test2=val2"); + newDatasourceEntity.setColo("newColo2"); + Assert.assertFalse(UpdateHelper.isDatasourceEntityUpdated(datasourceEntity, newDatasourceEntity)); + + // Changing read or write endpoint should trigger rewrite + newDatasourceEntity.getInterfaces().getInterfaces().get(0).setEndpoint("jdbc:hsqldb:localhost2/db1"); + Assert.assertTrue(UpdateHelper.isDatasourceEntityUpdated(datasourceEntity, newDatasourceEntity)); + + // change credential type or value should trigger + newDatasourceEntity = getNewDatasource(unmarshaller, datasource); + Credential cred = newDatasourceEntity.getInterfaces().getInterfaces().get(0).getCredential(); + cred.setPasswordText("blah"); + Assert.assertTrue(UpdateHelper.isDatasourceEntityUpdated(datasourceEntity, newDatasourceEntity)); + } + + private Datasource getNewDatasource(Unmarshaller unmarshaller, String datasource) throws JAXBException { + Datasource newDatasourceEntity = (Datasource) unmarshaller.unmarshal(this.getClass() + .getResource(DATASOURCE_XML)); + newDatasourceEntity.setName(datasource); + return newDatasourceEntity; + } + private static Location getLocation(Feed feed, LocationType type, String cluster) { org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster); if (feedCluster.getLocations() != null) { http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index 5fa345d..6c9237b 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -39,7 +39,9 @@ import org.apache.falcon.entity.v0.EntityGraph; import org.apache.falcon.entity.v0.EntityIntegrityChecker; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.datasource.Datasource; import org.apache.falcon.entity.v0.feed.Clusters; +import org.apache.falcon.entity.v0.feed.ClusterType; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.resource.APIResult.Status; @@ -334,26 +336,41 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { validateUpdate(oldEntity, newEntity); configStore.initiateUpdate(newEntity); - obtainEntityLocks(oldEntity, "update", tokenList); StringBuilder result = new StringBuilder("Updated successfully"); - //Update in workflow engine if entity is not a cluster (cluster entity is not scheduled) - if (!DeploymentUtil.isPrism() && !entityType.equals(EntityType.CLUSTER)) { - Set<String> oldClusters = EntityUtil.getClustersDefinedInColos(oldEntity); - Set<String> newClusters = EntityUtil.getClustersDefinedInColos(newEntity); - newClusters.retainAll(oldClusters); //common clusters for update - oldClusters.removeAll(newClusters); //deleted clusters - - for (String cluster : newClusters) { - result.append(getWorkflowEngine(oldEntity).update(oldEntity, newEntity, cluster, skipDryRun)); + switch(entityType) { + case CLUSTER: + configStore.update(entityType, newEntity); + break; + case DATASOURCE: + configStore.update(entityType, newEntity); + // check always if dependant feeds are already upgraded and upgrade accordingly + if (entityType.equals(EntityType.DATASOURCE)) { + ConfigurationStore.get().cleanupUpdateInit(); + releaseEntityLocks(entityName, tokenList); + updateDatasourceDependents(entityName, skipDryRun); } - for (String cluster : oldClusters) { - getWorkflowEngine(oldEntity).delete(oldEntity, cluster); + break; + case FEED: + case PROCESS: + if (!DeploymentUtil.isPrism()) { + Set<String> oldClusters = EntityUtil.getClustersDefinedInColos(oldEntity); + Set<String> newClusters = EntityUtil.getClustersDefinedInColos(newEntity); + newClusters.retainAll(oldClusters); //common clusters for update + oldClusters.removeAll(newClusters); //deleted clusters + for (String cluster : newClusters) { + result.append(getWorkflowEngine(oldEntity).update(oldEntity, newEntity, cluster, skipDryRun)); + } + for (String cluster : oldClusters) { + getWorkflowEngine(oldEntity).delete(oldEntity, cluster); + } } + configStore.update(entityType, newEntity); + break; + default: + throw FalconWebException.newAPIException("Unknown entity type in update : " + entityType); } - - configStore.update(entityType, newEntity); return new APIResult(APIResult.Status.SUCCEEDED, result.toString()); } catch (Throwable e) { LOG.error("Update failed", e); @@ -365,6 +382,83 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { } /** + * check if the data source entity dependent feeds are upgraded or not by checking against the data source entity + * version and upgrade feeds accordingly. + * + * @param datasourceName Name of the data source entity + * @param skipDryRun Skip dry run during update if set to true + * @return APIResult + * + */ + + public APIResult updateDatasourceDependents(String datasourceName, Boolean skipDryRun) { + try { + Datasource datasource = EntityUtil.getEntity(EntityType.DATASOURCE, datasourceName); + StringBuilder result = new StringBuilder(String.format("Updating feed entities " + + "dependent on datasource : %s ", datasource.getName())); + + // get data source dependent entities and check the version referenced is same + Pair<String, EntityType>[] dependentEntities = EntityIntegrityChecker.referencedBy(datasource); + if (dependentEntities == null) { + return new APIResult(APIResult.Status.SUCCEEDED, String.format("Datasource %s has " + + "no dependent entities", datasourceName)); + } + for (Pair<String, EntityType> depEntity : dependentEntities) { + Entity entity = EntityUtil.getEntity(depEntity.second, depEntity.first); + if (entity.getEntityType() != EntityType.FEED) { + throw FalconWebException.newAPIException("Datasource dependents should be FEEDS, but" + + "encountered type : " + entity.getEntityType()); + } + Feed newFeed = (Feed) entity.copy(); + for (org.apache.falcon.entity.v0.feed.Cluster feedCluster + : newFeed.getClusters().getClusters()) { + if (feedCluster.getType() == ClusterType.SOURCE) { + boolean updatedFeed = isUpdateFeedDatasourceVersion(feedCluster, datasource, newFeed); + if (updatedFeed) { + // rewrite the dependent feed and update it on the store + result.append(getWorkflowEngine(entity).update(entity, newFeed, + feedCluster.getName(), skipDryRun)); + updateEntityInConfigStore(entity, newFeed); + } + } + } + } + return new APIResult(APIResult.Status.SUCCEEDED, result.toString()); + } catch (FalconException e) { + LOG.error("Update failed", e); + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + + private boolean isUpdateFeedDatasourceVersion(org.apache.falcon.entity.v0.feed.Cluster feedCluster, + Datasource datasource, Feed feed) throws FalconException { + org.apache.falcon.entity.v0.feed.Datasource updateFeedImp = incFeedDatasourceVersion(datasource, + feed, feedCluster.getImport() != null ? feedCluster.getImport().getSource() : null); + org.apache.falcon.entity.v0.feed.Datasource updateFeedExp = incFeedDatasourceVersion(datasource, + feed, feedCluster.getExport() != null ? feedCluster.getExport().getTarget() : null); + return ((updateFeedImp != null) || (updateFeedExp != null)); + } + + private org.apache.falcon.entity.v0.feed.Datasource incFeedDatasourceVersion(Datasource datasource, + Feed feed, org.apache.falcon.entity.v0.feed.Datasource depDatasource) throws FalconException { + if ((depDatasource != null) && (datasource.getName().equals(depDatasource.getName()))) { + if (depDatasource.getVersion() < datasource.getVersion()) { + LOG.info(String.format("Updating since Feed '%s' referenced datasource '%s' " + + "version '%d' < datasource entity version in store '%d'", feed.getName(), + depDatasource.getName(), depDatasource.getVersion(), datasource.getVersion())); + depDatasource.setVersion(depDatasource.getVersion()+1); + return depDatasource; + } else if (depDatasource.getVersion() > datasource.getVersion()) { + throw new FalconException(String.format("Feed '%s' datasource '%s' version '%d' > datasource " + + "entity version in store '%d'", feed.getName(), depDatasource.getName(), + depDatasource.getVersion(), datasource.getVersion())); + } + } + return null; + } + + + /** * Updates scheduled dependent entities of a cluster. * * @param clusterName Name of cluster http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/webapp/src/test/java/org/apache/falcon/lifecycle/DatasourceUpdateIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/DatasourceUpdateIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/DatasourceUpdateIT.java new file mode 100644 index 0000000..9a11751 --- /dev/null +++ b/webapp/src/test/java/org/apache/falcon/lifecycle/DatasourceUpdateIT.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.lifecycle; + + +import junit.framework.Assert; +import org.apache.commons.io.FileUtils; +import org.apache.falcon.resource.TestContext; +import org.apache.falcon.util.HsqldbTestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.util.Map; + +/** + * Integration test for Datasource Update. + */ + +public class DatasourceUpdateIT { + public static final Logger LOG = LoggerFactory.getLogger(DatasourceUpdateIT.class); + + private static final String DATASOURCE_NAME_KEY = "datasourcename"; + + @BeforeClass + public void setUp() throws Exception { + HsqldbTestUtils.start(); + HsqldbTestUtils.createSqoopUser("sqoop_user", "sqoop"); + HsqldbTestUtils.createSqoopUser("sqoop_user2", "sqoop"); + HsqldbTestUtils.changeSAPassword("sqoop"); + HsqldbTestUtils.createAndPopulateCustomerTable(); + + TestContext.cleanupStore(); + TestContext.prepare(); + } + + @AfterClass + public void tearDown() throws Exception { + HsqldbTestUtils.tearDown(); + FileUtils.deleteDirectory(new File("../localhost/")); + FileUtils.deleteDirectory(new File("localhost")); + } + + @Test + public void testHSql() throws Exception { + Assert.assertEquals(4, HsqldbTestUtils.getNumberOfRows()); + } + + public void testDatasourceUpdate() throws Exception { + TestContext context = new TestContext(); + Map<String, String> overlay = context.getUniqueOverlay(); + String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay); + context.setCluster(filePath); + LOG.info("entity -submit -type cluster -file " + filePath); + Assert.assertEquals(TestContext.executeWithURL("entity -submit -type cluster -file " + filePath), 0); + + // Make a new datasource name into the overlay so that DATASOURCE_TEMPLATE1 and FEED_TEMPLATE3 + // are populated with the same datasource name + String dsName = "datasource-test-1"; + overlay.put(DATASOURCE_NAME_KEY, dsName); + filePath = TestContext.overlayParametersOverTemplate(TestContext.DATASOURCE_TEMPLATE1, overlay); + LOG.info("Submit datatsource entity {} via entity -submit -type datasource -file {}", dsName, filePath); + Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0); + + filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE3, overlay); + LOG.info("Submit feed with datasource {} via entity -submitAndSchedule -type feed -file {}", dsName, filePath); + Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath)); + + // instantiate another datasource with same name but different end point + overlay.put(DATASOURCE_NAME_KEY, dsName); + filePath = TestContext.overlayParametersOverTemplate(TestContext.DATASOURCE_TEMPLATE5, overlay); + LOG.info("update datasource {} via -update -type datasource -file {}", dsName, filePath); + Assert.assertEquals(0, TestContext.executeWithURL("entity -update -type datasource -file " + filePath)); + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/webapp/src/test/java/org/apache/falcon/resource/TestContext.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java index 5412608..5173032 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java +++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java @@ -95,6 +95,7 @@ public class TestContext extends AbstractTestContext { public static final String DATASOURCE_TEMPLATE2 = "/datasource-template2.xml"; public static final String DATASOURCE_TEMPLATE3 = "/datasource-template3.xml"; public static final String DATASOURCE_TEMPLATE4 = "/datasource-template4.xml"; + public static final String DATASOURCE_TEMPLATE5 = "/datasource-template5.xml"; public static final String CLUSTER_TEMPLATE = "/cluster-template.xml"; public static final String CLUSTER_UPDATED_TEMPLATE = "/cluster-updated-template.xml"; public static final String PIG_PROCESS_TEMPLATE = "/pig-process-template.xml"; http://git-wip-us.apache.org/repos/asf/falcon/blob/2b84dd56/webapp/src/test/resources/datasource-template5.xml ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/datasource-template5.xml b/webapp/src/test/resources/datasource-template5.xml new file mode 100644 index 0000000..f5414b8 --- /dev/null +++ b/webapp/src/test/resources/datasource-template5.xml @@ -0,0 +1,46 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<datasource colo="##colo##" description="" type="hsql" name="##datasourcename##" xmlns="uri:falcon:datasource:0.1"> + <interfaces> + <interface type="readonly" endpoint="jdbc:hsqldb:hsql://localhost/db1"> + <credential type="password-text"> + <userName>sqoop_user2</userName> + <passwordText>sqoop</passwordText> + </credential> + </interface> + + <interface type="write" endpoint="jdbc:hsqldb:hsql://localhost/db1"> + <credential type="password-text"> + <userName>sqoop_user</userName> + <passwordText>sqoop</passwordText> + </credential> + </interface> + + <credential type="password-text"> + <userName>sqoop_user</userName> + <passwordText>sqoop</passwordText> + </credential> + </interfaces> + + <driver> + <clazz>org.hsqldb.jdbcDriver</clazz> + <jar>/user/oozie/share/lib/lib_20150721010816/sqoop/hsqldb-1.8.0.7.jar</jar> + </driver> +</datasource> \ No newline at end of file
