Repository: falcon Updated Branches: refs/heads/master 9ec4c23a1 -> aec6084e7
FALCON-1836 Import from database to HCatalog Author: Venkatesan Ramachandran <[email protected]> Reviewers: "Balu Vellanki <[email protected]>, Ajay Yadava <[email protected]>, Peeyush Bishnoi <[email protected]>" Closes #61 from vramachan/master Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/aec6084e Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/aec6084e Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/aec6084e Branch: refs/heads/master Commit: aec6084e7df82574c0b41063ab0f4a4115cbf25d Parents: 9ec4c23 Author: Venkatesan Ramachandran <[email protected]> Authored: Mon Mar 14 14:34:28 2016 -0700 Committer: bvellanki <[email protected]> Committed: Mon Mar 14 14:34:28 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/falcon/entity/HiveUtil.java | 4 +- .../org/apache/falcon/entity/HiveUtilTest.java | 4 +- .../engine/oozie/utils/OozieBuilderUtils.java | 4 +- oozie/pom.xml | 19 +++ .../oozie/DatabaseExportWorkflowBuilder.java | 116 ++++++++++++++----- .../oozie/DatabaseImportWorkflowBuilder.java | 115 ++++++++++++++---- .../falcon/oozie/ExportWorkflowBuilder.java | 5 +- .../oozie/FeedExportCoordinatorBuilder.java | 14 +-- .../apache/falcon/oozie/ImportExportCommon.java | 46 +++++++- .../falcon/oozie/ImportWorkflowBuilder.java | 5 +- .../OozieOrchestrationWorkflowBuilder.java | 6 +- .../java/org/apache/falcon/util/OozieUtils.java | 28 +++++ .../apache/falcon/lifecycle/FeedExportIT.java | 115 ++++++++++++++++++ .../apache/falcon/lifecycle/FeedImportIT.java | 56 ++++++++- .../falcon/resource/AbstractTestContext.java | 3 +- .../test/resources/feed-export-template6.xml | 56 +++++++++ webapp/src/test/resources/feed-template5.xml | 55 +++++++++ 17 files changed, 575 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/common/src/main/java/org/apache/falcon/entity/HiveUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/HiveUtil.java b/common/src/main/java/org/apache/falcon/entity/HiveUtil.java index f4029e4..f8eaebb 100644 --- a/common/src/main/java/org/apache/falcon/entity/HiveUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/HiveUtil.java @@ -29,7 +29,7 @@ import java.util.Properties; */ public final class HiveUtil { public static final String METASTOREURIS = "hive.metastore.uris"; - public static final String METASTROE_URI = "hcat.metastore.uri"; + public static final String METASTORE_URI = "hcat.metastore.uri"; public static final String NODE = "hcatNode"; public static final String METASTORE_UGI = "hive.metastore.execute.setugi"; @@ -48,7 +48,7 @@ public final class HiveUtil { hiveCredentials.put(METASTOREURIS, metaStoreUrl); hiveCredentials.put(METASTORE_UGI, "true"); hiveCredentials.put(NODE, metaStoreUrl.replace("thrift", "hcat")); - hiveCredentials.put(METASTROE_URI, metaStoreUrl); + hiveCredentials.put(METASTORE_URI, metaStoreUrl); if (SecurityUtil.isSecurityEnabled()) { String principal = ClusterHelper http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java b/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java index c37cebd..7f890f3 100644 --- a/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java +++ b/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java @@ -55,7 +55,7 @@ public class HiveUtilTest { Properties expected = new Properties(); expected.put(HiveUtil.METASTORE_UGI, "true"); expected.put(HiveUtil.NODE, metaStoreUrl.replace("thrift", "hcat")); - expected.put(HiveUtil.METASTROE_URI, metaStoreUrl); + expected.put(HiveUtil.METASTORE_URI, metaStoreUrl); expected.put(HiveUtil.METASTOREURIS, metaStoreUrl); Properties actual = HiveUtil.getHiveCredentials(cluster); @@ -91,7 +91,7 @@ public class HiveUtilTest { expected.put(SecurityUtil.METASTORE_PRINCIPAL, principal); expected.put(HiveUtil.METASTORE_UGI, "true"); expected.put(HiveUtil.NODE, metaStoreUrl.replace("thrift", "hcat")); - expected.put(HiveUtil.METASTROE_URI, metaStoreUrl); + expected.put(HiveUtil.METASTORE_URI, metaStoreUrl); expected.put(HiveUtil.METASTOREURIS, metaStoreUrl); Properties actual = HiveUtil.getHiveCredentials(cluster); http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java ---------------------------------------------------------------------- diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java index 732a9e7..8f1b53b 100644 --- a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java +++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java @@ -417,7 +417,7 @@ public final class OozieBuilderUtils { credential.setName(credentialName); credential.setType("hcat"); - credential.getProperty().add(createProperty(HiveUtil.METASTROE_URI, metaStoreUrl)); + credential.getProperty().add(createProperty(HiveUtil.METASTORE_URI, metaStoreUrl)); credential.getProperty().add(createProperty(SecurityUtil.METASTORE_PRINCIPAL, ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL))); @@ -441,7 +441,7 @@ public final class OozieBuilderUtils { hiveCredentials.put(HiveUtil.METASTOREURIS, metaStoreUrl); hiveCredentials.put(HiveUtil.METASTORE_UGI, "true"); hiveCredentials.put(HiveUtil.NODE, metaStoreUrl.replace("thrift", "hcat")); - hiveCredentials.put(HiveUtil.METASTROE_URI, metaStoreUrl); + hiveCredentials.put(HiveUtil.METASTORE_URI, metaStoreUrl); if (SecurityUtil.isSecurityEnabled()) { String principal = ClusterHelper http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/pom.xml ---------------------------------------------------------------------- diff --git a/oozie/pom.xml b/oozie/pom.xml index 4623d8b..c53d33c 100644 --- a/oozie/pom.xml +++ b/oozie/pom.xml @@ -186,6 +186,25 @@ </configuration> </execution> <execution> + <id>sqoop-gen</id> + <goals> + <goal>generate</goal> + </goals> + <configuration> + <forceRegenerate>true</forceRegenerate> + <generatePackage>org.apache.falcon.oozie.sqoop</generatePackage> + <schemas> + <schema> + <dependencyResource> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-client</artifactId> + <resource>sqoop-action-0.3.xsd</resource> + </dependencyResource> + </schema> + </schemas> + </configuration> + </execution> + <execution> <id>bundle-gen</id> <goals> <goal>generate</goal> http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java index d69611b..284c4a3 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java @@ -18,18 +18,25 @@ package org.apache.falcon.oozie; +import com.google.common.base.Splitter; import org.apache.falcon.FalconException; import org.apache.falcon.Tag; import org.apache.falcon.entity.DatasourceHelper; import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.Storage; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.datasource.Datasource; +import org.apache.falcon.entity.v0.feed.CatalogTable; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.LoadMethod; import org.apache.falcon.oozie.workflow.ACTION; import org.apache.falcon.oozie.workflow.WORKFLOWAPP; +import org.apache.falcon.util.OozieUtils; import org.apache.falcon.workflow.WorkflowExecutionContext; +import org.apache.hadoop.fs.Path; +import javax.xml.bind.JAXBElement; +import java.util.Iterator; import java.util.Map; import java.util.Properties; @@ -49,22 +56,27 @@ public class DatabaseExportWorkflowBuilder extends ExportWorkflowBuilder { } @Override - protected Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow) throws FalconException { + protected Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow, Path buildPath) + throws FalconException { - addLibExtensionsToWorkflow(cluster, workflow, Tag.EXPORT); + ACTION action = unmarshalAction(EXPORT_SQOOP_ACTION_TEMPLATE); + JAXBElement<org.apache.falcon.oozie.sqoop.ACTION> actionJaxbElement = OozieUtils.unMarshalSqoopAction(action); + org.apache.falcon.oozie.sqoop.ACTION sqoopExport = actionJaxbElement.getValue(); + + Properties props = new Properties(); + ImportExportCommon.addHCatalogProperties(props, entity, cluster, workflow, this, buildPath); + sqoopExport.getJobXml().add("${wf:appPath()}/conf/hive-site.xml"); + OozieUtils.marshalSqoopAction(action, actionJaxbElement); - ACTION sqoopExport = unmarshalAction(EXPORT_SQOOP_ACTION_TEMPLATE); - addTransition(sqoopExport, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); - workflow.getDecisionOrForkOrJoin().add(sqoopExport); + addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); + workflow.getDecisionOrForkOrJoin().add(action); //Add post-processing actions ACTION success = getSuccessPostProcessAction(); - // delete addHDFSServersConfig(success, src, target); addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME); workflow.getDecisionOrForkOrJoin().add(success); ACTION fail = getFailPostProcessAction(); - // delete addHDFSServersConfig(fail, src, target); addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME); workflow.getDecisionOrForkOrJoin().add(fail); @@ -74,7 +86,6 @@ public class DatabaseExportWorkflowBuilder extends ExportWorkflowBuilder { // build the sqoop command and put it in the properties String sqoopCmd = buildSqoopCommand(cluster, entity); LOG.info("SQOOP EXPORT COMMAND : " + sqoopCmd); - Properties props = new Properties(); props.put("sqoopCommand", sqoopCmd); return props; } @@ -86,28 +97,21 @@ public class DatabaseExportWorkflowBuilder extends ExportWorkflowBuilder { buildConnectArg(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR); buildTableArg(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR); - ImportExportCommon.buildUserPasswordArg(sqoopArgs, sqoopOptions, cluster, entity) + Datasource datasource = DatasourceHelper.getDatasource(FeedHelper.getExportDatasourceName( + FeedHelper.getCluster(entity, cluster.getName()))); + ImportExportCommon.buildUserPasswordArg(sqoopArgs, sqoopOptions, datasource) .append(ImportExportCommon.ARG_SEPARATOR); buildNumMappers(sqoopArgs, extraArgs).append(ImportExportCommon.ARG_SEPARATOR); - buildArguments(sqoopArgs, extraArgs).append(ImportExportCommon.ARG_SEPARATOR); + buildArguments(sqoopArgs, extraArgs, feed, cluster).append(ImportExportCommon.ARG_SEPARATOR); buildLoadType(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR); - buildExportDirArg(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR); + buildExportArg(sqoopArgs, feed, cluster).append(ImportExportCommon.ARG_SEPARATOR); - StringBuffer sqoopCmd = new StringBuffer(); + StringBuilder sqoopCmd = new StringBuilder(); return sqoopCmd.append("export").append(ImportExportCommon.ARG_SEPARATOR) .append(sqoopOptions).append(ImportExportCommon.ARG_SEPARATOR) .append(sqoopArgs).toString(); } - private StringBuilder buildDriverArgs(StringBuilder builder, Cluster cluster) throws FalconException { - org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName()); - Datasource db = DatasourceHelper.getDatasource(FeedHelper.getExportDatasourceName(feedCluster)); - if ((db.getDriver() != null) && (db.getDriver().getClazz() != null)) { - builder.append("--driver").append(ImportExportCommon.ARG_SEPARATOR).append(db.getDriver().getClazz()); - } - return builder; - } - private StringBuilder buildConnectArg(StringBuilder builder, Cluster cluster) throws FalconException { org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName()); return builder.append("--connect").append(ImportExportCommon.ARG_SEPARATOR) @@ -132,18 +136,32 @@ public class DatabaseExportWorkflowBuilder extends ExportWorkflowBuilder { return builder.append(modeType); } + private StringBuilder buildExportArg(StringBuilder builder, Feed feed, Cluster cluster) + throws FalconException { + Storage.TYPE feedStorageType = FeedHelper.getStorageType(feed, cluster); + if (feedStorageType == Storage.TYPE.TABLE) { + return buildExportTableArg(builder, feed.getTable()); + } else { + return buildExportDirArg(builder, cluster); + } + } + private StringBuilder buildExportDirArg(StringBuilder builder, Cluster cluster) throws FalconException { return builder.append("--export-dir").append(ImportExportCommon.ARG_SEPARATOR) - .append(String.format("${coord:dataIn('%s')}", - FeedExportCoordinatorBuilder.EXPORT_DATAIN_NAME)); + .append(String.format("${coord:dataIn('%s')}", + FeedExportCoordinatorBuilder.EXPORT_DATAIN_NAME)); } - private StringBuilder buildArguments(StringBuilder builder, Map<String, String> extraArgs) - throws FalconException { + private StringBuilder buildArguments(StringBuilder builder, Map<String, String> extraArgs, Feed feed, + Cluster cluster) throws FalconException { + Storage.TYPE feedStorageType = FeedHelper.getStorageType(feed, cluster); for(Map.Entry<String, String> e : extraArgs.entrySet()) { + if ((feedStorageType == Storage.TYPE.TABLE) && (e.getKey().equals("--update-key"))) { + continue; + } builder.append(e.getKey()).append(ImportExportCommon.ARG_SEPARATOR).append(e.getValue()) - .append(ImportExportCommon.ARG_SEPARATOR); + .append(ImportExportCommon.ARG_SEPARATOR); } return builder; } @@ -169,4 +187,50 @@ public class DatabaseExportWorkflowBuilder extends ExportWorkflowBuilder { org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName()); return FeedHelper.getExportArguments(feedCluster); } + + private StringBuilder buildExportTableArg(StringBuilder builder, CatalogTable catalog) throws FalconException { + + LOG.info("Catalog URI {}", catalog.getUri()); + builder.append("--skip-dist-cache").append(ImportExportCommon.ARG_SEPARATOR); + Iterator<String> itr = Splitter.on("#").split(catalog.getUri()).iterator(); + String dbTable = itr.next(); + String partitions = itr.next(); + Iterator<String> itrDbTable = Splitter.on(":").split(dbTable).iterator(); + itrDbTable.next(); + String db = itrDbTable.next(); + String table = itrDbTable.next(); + LOG.debug("Target database {}, table {}", db, table); + builder.append("--hcatalog-database").append(ImportExportCommon.ARG_SEPARATOR) + .append(String.format("${coord:databaseIn('%s')}", FeedExportCoordinatorBuilder.EXPORT_DATAIN_NAME)) + .append(ImportExportCommon.ARG_SEPARATOR); + + builder.append("--hcatalog-table").append(ImportExportCommon.ARG_SEPARATOR) + .append(String.format("${coord:tableIn('%s')}", FeedExportCoordinatorBuilder.EXPORT_DATAIN_NAME)) + .append(ImportExportCommon.ARG_SEPARATOR); + + Map<String, String> partitionsMap = ImportExportCommon.getPartitionKeyValues(partitions); + if (partitionsMap.size() > 0) { + StringBuilder partitionKeys = new StringBuilder(); + StringBuilder partitionValues = new StringBuilder(); + for (Map.Entry<String, String> e : partitionsMap.entrySet()) { + partitionKeys.append(e.getKey()); + partitionKeys.append(','); + partitionValues.append(String.format("${coord:dataInPartitionMin('%s','%s')}", + FeedExportCoordinatorBuilder.EXPORT_DATAIN_NAME, + e.getKey())); + partitionValues.append(','); + } + if (partitionsMap.size() > 0) { + partitionKeys.setLength(partitionKeys.length()-1); + partitionValues.setLength(partitionValues.length()-1); + } + LOG.debug("partitionKeys {} and partitionValue {}", partitionKeys.toString(), partitionValues.toString()); + builder.append("--hcatalog-partition-keys").append(ImportExportCommon.ARG_SEPARATOR) + .append(partitionKeys.toString()).append(ImportExportCommon.ARG_SEPARATOR); + builder.append("--hcatalog-partition-values").append(ImportExportCommon.ARG_SEPARATOR) + .append(partitionValues.toString()).append(ImportExportCommon.ARG_SEPARATOR); + } + return builder; + } } + http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java index 66bfa9b..3e24428 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java @@ -18,25 +18,35 @@ package org.apache.falcon.oozie; +import com.google.common.base.Splitter; import org.apache.falcon.FalconException; import org.apache.falcon.Tag; import org.apache.falcon.entity.DatasourceHelper; import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.Storage; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.datasource.Datasource; +import org.apache.falcon.entity.v0.feed.CatalogTable; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.oozie.workflow.ACTION; import org.apache.falcon.oozie.workflow.WORKFLOWAPP; +import org.apache.falcon.util.OozieUtils; import org.apache.falcon.workflow.WorkflowExecutionContext; +import java.util.Iterator; import java.util.Map; import java.util.Properties; +import org.apache.hadoop.fs.Path; + +import javax.xml.bind.JAXBElement; + /** * Builds Datasource import workflow for Oozie. */ public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder { + protected static final String IMPORT_SQOOP_ACTION_TEMPLATE = "/action/feed/import-sqoop-database-action.xml"; protected static final String IMPORT_ACTION_NAME="db-import-sqoop"; @@ -48,23 +58,27 @@ public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder { } @Override - protected Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow) throws FalconException { + protected Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow, Path buildPath) + throws FalconException { - addLibExtensionsToWorkflow(cluster, workflow, Tag.IMPORT); + ACTION action = unmarshalAction(IMPORT_SQOOP_ACTION_TEMPLATE); + JAXBElement<org.apache.falcon.oozie.sqoop.ACTION> actionJaxbElement = OozieUtils.unMarshalSqoopAction(action); + org.apache.falcon.oozie.sqoop.ACTION sqoopImport = actionJaxbElement.getValue(); + + Properties props = new Properties(); + ImportExportCommon.addHCatalogProperties(props, entity, cluster, workflow, this, buildPath); + sqoopImport.getJobXml().add("${wf:appPath()}/conf/hive-site.xml"); + OozieUtils.marshalSqoopAction(action, actionJaxbElement); - ACTION sqoopImport = unmarshalAction(IMPORT_SQOOP_ACTION_TEMPLATE); - // delete addHDFSServersConfig(sqoopImport, src, target); - addTransition(sqoopImport, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); - workflow.getDecisionOrForkOrJoin().add(sqoopImport); + addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); + workflow.getDecisionOrForkOrJoin().add(action); //Add post-processing actions ACTION success = getSuccessPostProcessAction(); - // delete addHDFSServersConfig(success, src, target); addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME); workflow.getDecisionOrForkOrJoin().add(success); ACTION fail = getFailPostProcessAction(); - // delete addHDFSServersConfig(fail, src, target); addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME); workflow.getDecisionOrForkOrJoin().add(fail); @@ -73,8 +87,7 @@ public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder { // build the sqoop command and put it in the properties String sqoopCmd = buildSqoopCommand(cluster, entity); - LOG.info("SQOOP COMMAND : " + sqoopCmd); - Properties props = new Properties(); + LOG.info("SQOOP IMPORT COMMAND : " + sqoopCmd); props.put("sqoopCommand", sqoopCmd); return props; } @@ -86,16 +99,18 @@ public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder { buildDriverArgs(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR); buildConnectArg(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR); buildTableArg(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR); - ImportExportCommon.buildUserPasswordArg(sqoopArgs, sqoopOptions, cluster, entity) - .append(ImportExportCommon.ARG_SEPARATOR); + Datasource datasource = DatasourceHelper.getDatasource(FeedHelper.getImportDatasourceName( + FeedHelper.getCluster(entity, cluster.getName()))); + ImportExportCommon.buildUserPasswordArg(sqoopArgs, sqoopOptions, datasource) + .append(ImportExportCommon.ARG_SEPARATOR); buildNumMappers(sqoopArgs, extraArgs).append(ImportExportCommon.ARG_SEPARATOR); buildArguments(sqoopArgs, extraArgs).append(ImportExportCommon.ARG_SEPARATOR); - buildTargetDirArg(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR); + buildTargetArg(sqoopArgs, feed, cluster).append(ImportExportCommon.ARG_SEPARATOR); - StringBuffer sqoopCmd = new StringBuffer(); + StringBuilder sqoopCmd = new StringBuilder(); return sqoopCmd.append("import").append(ImportExportCommon.ARG_SEPARATOR) - .append(sqoopOptions).append(ImportExportCommon.ARG_SEPARATOR) - .append(sqoopArgs).toString(); + .append(sqoopOptions).append(ImportExportCommon.ARG_SEPARATOR) + .append(sqoopArgs).toString(); } private StringBuilder buildDriverArgs(StringBuilder builder, Cluster cluster) throws FalconException { @@ -110,29 +125,40 @@ public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder { private StringBuilder buildConnectArg(StringBuilder builder, Cluster cluster) throws FalconException { org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName()); return builder.append("--connect").append(ImportExportCommon.ARG_SEPARATOR) - .append(DatasourceHelper.getReadOnlyEndpoint( - DatasourceHelper.getDatasource(FeedHelper.getImportDatasourceName(feedCluster)))); + .append(DatasourceHelper.getReadOnlyEndpoint( + DatasourceHelper.getDatasource(FeedHelper.getImportDatasourceName(feedCluster)))); } private StringBuilder buildTableArg(StringBuilder builder, Cluster cluster) throws FalconException { org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName()); return builder.append("--table").append(ImportExportCommon.ARG_SEPARATOR) - .append(FeedHelper.getImportDataSourceTableName(feedCluster)); + .append(FeedHelper.getImportDataSourceTableName(feedCluster)); } - private StringBuilder buildTargetDirArg(StringBuilder builder, Cluster cluster) + private StringBuilder buildTargetArg(StringBuilder builder, Feed feed, Cluster cluster) + throws FalconException { + Storage.TYPE feedStorageType = FeedHelper.getStorageType(feed, cluster); + if (feedStorageType == Storage.TYPE.TABLE) { + return buildTargetTableArg(builder, feed.getTable()); + + } else { + return buildTargetDirArg(builder); + } + } + + private StringBuilder buildTargetDirArg(StringBuilder builder) throws FalconException { return builder.append("--delete-target-dir").append(ImportExportCommon.ARG_SEPARATOR) .append("--target-dir").append(ImportExportCommon.ARG_SEPARATOR) .append(String.format("${coord:dataOut('%s')}", - FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME)); + FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME)); } private StringBuilder buildArguments(StringBuilder builder, Map<String, String> extraArgs) throws FalconException { for(Map.Entry<String, String> e : extraArgs.entrySet()) { builder.append(e.getKey()).append(ImportExportCommon.ARG_SEPARATOR).append(e.getValue()) - .append(ImportExportCommon.ARG_SEPARATOR); + .append(ImportExportCommon.ARG_SEPARATOR); } return builder; } @@ -158,4 +184,49 @@ public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder { org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName()); return FeedHelper.getImportArguments(feedCluster); } + + private StringBuilder buildTargetTableArg(StringBuilder builder, CatalogTable catalog) throws FalconException { + + LOG.info("Catalog URI {}", catalog.getUri()); + builder.append("--skip-dist-cache").append(ImportExportCommon.ARG_SEPARATOR); + Iterator<String> itr = Splitter.on("#").split(catalog.getUri()).iterator(); + String dbTable = itr.next(); + String partitions = itr.next(); + Iterator<String> itrDbTable = Splitter.on(":").split(dbTable).iterator(); + itrDbTable.next(); + String db = itrDbTable.next(); + String table = itrDbTable.next(); + LOG.debug("Target database {}, table {}", db, table); + builder.append("--hcatalog-database").append(ImportExportCommon.ARG_SEPARATOR) + .append(String.format("${coord:databaseOut('%s')}", FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME)) + .append(ImportExportCommon.ARG_SEPARATOR); + + builder.append("--hcatalog-table").append(ImportExportCommon.ARG_SEPARATOR) + .append(String.format("${coord:tableOut('%s')}", FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME)) + .append(ImportExportCommon.ARG_SEPARATOR); + + Map<String, String> partitionsMap = ImportExportCommon.getPartitionKeyValues(partitions); + if (partitionsMap.size() > 0) { + StringBuilder partitionKeys = new StringBuilder(); + StringBuilder partitionValues = new StringBuilder(); + for (Map.Entry<String, String> e : partitionsMap.entrySet()) { + partitionKeys.append(e.getKey()); + partitionKeys.append(','); + partitionValues.append(String.format("${coord:dataOutPartitionValue('%s','%s')}", + FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME, + e.getKey())); + partitionValues.append(','); + } + if (partitionsMap.size() > 0) { + partitionKeys.setLength(partitionKeys.length()-1); + partitionValues.setLength(partitionValues.length()-1); + } + LOG.debug("partitionKeys {} and partitionValue {}", partitionKeys.toString(), partitionValues.toString()); + builder.append("--hcatalog-partition-keys").append(ImportExportCommon.ARG_SEPARATOR) + .append(partitionKeys.toString()).append(ImportExportCommon.ARG_SEPARATOR); + builder.append("--hcatalog-partition-values").append(ImportExportCommon.ARG_SEPARATOR) + .append(partitionValues.toString()).append(ImportExportCommon.ARG_SEPARATOR); + } + return builder; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java index a55656c..af7431a 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java @@ -47,7 +47,7 @@ public abstract class ExportWorkflowBuilder extends OozieOrchestrationWorkflowBu WORKFLOWAPP workflow = new WORKFLOWAPP(); String wfName = EntityUtil.getWorkflowName(Tag.EXPORT, entity).toString(); workflow.setName(wfName); - Properties p = getWorkflow(cluster, workflow); + Properties p = getWorkflow(cluster, workflow, buildPath); marshal(cluster, workflow, buildPath); Properties props = FeedHelper.getFeedProperties(entity); @@ -81,5 +81,6 @@ public abstract class ExportWorkflowBuilder extends OozieOrchestrationWorkflowBu return props; } - protected abstract Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow) throws FalconException; + protected abstract Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow, Path buildPath) + throws FalconException; } http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/FeedExportCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/FeedExportCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/FeedExportCoordinatorBuilder.java index 1bfacc2..4437d8b 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/FeedExportCoordinatorBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/FeedExportCoordinatorBuilder.java @@ -53,7 +53,6 @@ public class FeedExportCoordinatorBuilder extends OozieCoordinatorBuilder<Feed> } public static final String EXPORT_DATASET_NAME = "export-dataset"; - public static final String EXPORT_DATAIN_NAME = "export-input"; private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(FeedExportCoordinatorBuilder.class); @@ -63,19 +62,19 @@ public class FeedExportCoordinatorBuilder extends OozieCoordinatorBuilder<Feed> public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException { LOG.info("Generating Feed EXPORT coordinator."); - org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster((Feed) entity, cluster.getName()); + org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName()); if (!FeedHelper.isExportEnabled(feedCluster)) { return null; } - if (feedCluster.getValidity().getEnd().before(new Date())) { + if ((feedCluster.getValidity() != null) && (feedCluster.getValidity().getEnd().before(new Date()))) { LOG.warn("Feed IMPORT is not applicable as Feed's end time for cluster {} is not in the future", cluster.getName()); return null; } COORDINATORAPP coord = new COORDINATORAPP(); - initializeCoordAttributes(coord, (Feed) entity, cluster); + initializeCoordAttributes(coord, entity, cluster); Properties props = createCoordDefaultConfiguration(getEntityName()); initializeInputPath(coord, cluster, props); @@ -108,8 +107,8 @@ public class FeedExportCoordinatorBuilder extends OozieCoordinatorBuilder<Feed> coord.setInputEvents(new INPUTEVENTS()); } - Storage storage = FeedHelper.createStorage(cluster, (Feed) entity); - SYNCDATASET syncdataset = createDataSet((Feed) entity, cluster, storage, + Storage storage = FeedHelper.createStorage(cluster, entity); + SYNCDATASET syncdataset = createDataSet(entity, cluster, storage, EXPORT_DATASET_NAME, LocationType.DATA); if (syncdataset == null) { @@ -126,6 +125,7 @@ public class FeedExportCoordinatorBuilder extends OozieCoordinatorBuilder<Feed> datain.setDataset(EXPORT_DATASET_NAME); org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName()); datain.getInstance().add(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart())); + datain.getInstance().add(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart())); return datain; } @@ -138,7 +138,7 @@ public class FeedExportCoordinatorBuilder extends OozieCoordinatorBuilder<Feed> * @param storage * @param datasetName * @param locationType - * @return + * @return Sync dataset * @throws FalconException */ private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage, http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/ImportExportCommon.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/ImportExportCommon.java b/oozie/src/main/java/org/apache/falcon/oozie/ImportExportCommon.java index 19b567c..52c7820 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/ImportExportCommon.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/ImportExportCommon.java @@ -18,15 +18,27 @@ package org.apache.falcon.oozie; +import com.google.common.base.Splitter; import org.apache.falcon.FalconException; import org.apache.falcon.entity.DatasourceHelper; import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.Storage; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.datasource.Credential; import org.apache.falcon.entity.v0.datasource.Credentialtype; import org.apache.falcon.entity.v0.datasource.Datasource; import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.oozie.workflow.WORKFLOWAPP; +import org.apache.falcon.security.SecurityUtil; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; import java.net.URI; import java.net.URISyntaxException; @@ -38,13 +50,17 @@ public final class ImportExportCommon { static final String ARG_SEPARATOR = " "; + public static final Logger LOG = LoggerFactory.getLogger(ImportExportCommon.class); + + private static final Set<String> FALCON_IMPORT_SQOOP_ACTIONS = new HashSet<>( + Arrays.asList(new String[]{ OozieOrchestrationWorkflowBuilder.PREPROCESS_ACTION_NAME, + OozieOrchestrationWorkflowBuilder.USER_ACTION_NAME, })); + private ImportExportCommon() { } - static StringBuilder buildUserPasswordArg(StringBuilder builder, StringBuilder sqoopOpts, - Cluster cluster, Feed entity) throws FalconException { - org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName()); - Datasource db = DatasourceHelper.getDatasource(FeedHelper.getImportDatasourceName(feedCluster)); + static StringBuilder buildUserPasswordArg(StringBuilder builder, StringBuilder sqoopOpts, Datasource db) + throws FalconException { Credential cred = DatasourceHelper.getReadPasswordInfo(db); builder.append("--username").append(ARG_SEPARATOR) .append(cred.getUserName()) @@ -70,4 +86,26 @@ public final class ImportExportCommon { } return builder; } + + public static void addHCatalogProperties(Properties props, Feed entity, Cluster cluster, + WORKFLOWAPP workflow, OozieOrchestrationWorkflowBuilder<Feed> wBuilder, Path buildPath) + throws FalconException { + if (FeedHelper.getStorageType(entity, cluster) == Storage.TYPE.TABLE) { + wBuilder.createHiveConfiguration(cluster, buildPath, ""); + addHCatalogShareLibs(props); + if (SecurityUtil.isSecurityEnabled()) { + // add hcatalog credentials for secure mode and add a reference to each action + wBuilder.addHCatalogCredentials(workflow, cluster, + OozieOrchestrationWorkflowBuilder.HIVE_CREDENTIAL_NAME, FALCON_IMPORT_SQOOP_ACTIONS); + } + } + } + private static void addHCatalogShareLibs(Properties props) throws FalconException { + props.put("oozie.action.sharelib.for.sqoop", "sqoop,hive,hcatalog"); + } + + public static Map<String, String> getPartitionKeyValues(String partitionStr) { + return Splitter.on(";").withKeyValueSeparator("=").split(partitionStr); + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java index cae8497..2d93189 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java @@ -48,7 +48,7 @@ public abstract class ImportWorkflowBuilder extends OozieOrchestrationWorkflowBu WORKFLOWAPP workflow = new WORKFLOWAPP(); String wfName = EntityUtil.getWorkflowName(Tag.IMPORT, entity).toString(); workflow.setName(wfName); - Properties p = getWorkflow(cluster, workflow); + Properties p = getWorkflow(cluster, workflow, buildPath); marshal(cluster, workflow, buildPath); Properties props = FeedHelper.getFeedProperties(entity); @@ -81,5 +81,6 @@ public abstract class ImportWorkflowBuilder extends OozieOrchestrationWorkflowBu return props; } - protected abstract Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow) throws FalconException; + protected abstract Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow, Path buildPath) + throws FalconException; } http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java index e137e11..181f2d2 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java @@ -76,7 +76,7 @@ import java.util.Set; * @param <T> */ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extends OozieEntityBuilder<T> { - protected static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth"; + public static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth"; protected static final String USER_ACTION_NAME = "user-action"; protected static final String PREPROCESS_ACTION_NAME = "pre-processing"; @@ -329,7 +329,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend } // creates hive-site.xml configuration in conf dir for the given cluster on the same cluster. - protected void createHiveConfiguration(Cluster cluster, Path workflowPath, + public void createHiveConfiguration(Cluster cluster, Path workflowPath, String prefix) throws FalconException { Configuration hiveConf = getHiveCredentialsAsConf(cluster); @@ -413,7 +413,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend credential.setName(credentialName); credential.setType("hcat"); - credential.getProperty().add(createProperty(HiveUtil.METASTROE_URI, metaStoreUrl)); + credential.getProperty().add(createProperty(HiveUtil.METASTORE_URI, metaStoreUrl)); credential.getProperty().add(createProperty(SecurityUtil.METASTORE_PRINCIPAL, ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL))); http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java index 149a7e6..708788b 100644 --- a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java +++ b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java @@ -46,6 +46,7 @@ public final class OozieUtils { public static final JAXBContext BUNDLE_JAXB_CONTEXT; public static final JAXBContext CONFIG_JAXB_CONTEXT; protected static final JAXBContext HIVE_ACTION_JAXB_CONTEXT; + protected static final JAXBContext SQOOP_ACTION_JAXB_CONTEXT; static { try { @@ -56,6 +57,8 @@ public final class OozieUtils { CONFIG_JAXB_CONTEXT = JAXBContext.newInstance(CONFIGURATION.class); HIVE_ACTION_JAXB_CONTEXT = JAXBContext.newInstance( org.apache.falcon.oozie.hive.ACTION.class.getPackage().getName()); + SQOOP_ACTION_JAXB_CONTEXT = JAXBContext.newInstance( + org.apache.falcon.oozie.sqoop.ACTION.class.getPackage().getName()); } catch (JAXBException e) { throw new RuntimeException("Unable to create JAXB context", e); } @@ -97,4 +100,29 @@ public final class OozieUtils { throw new RuntimeException("Unable to marshall hive action.", e); } } + + @SuppressWarnings("unchecked") + public static JAXBElement<org.apache.falcon.oozie.sqoop.ACTION> unMarshalSqoopAction( + org.apache.falcon.oozie.workflow.ACTION wfAction) { + try { + Unmarshaller unmarshaller = SQOOP_ACTION_JAXB_CONTEXT.createUnmarshaller(); + unmarshaller.setEventHandler(new javax.xml.bind.helpers.DefaultValidationEventHandler()); + return (JAXBElement<org.apache.falcon.oozie.sqoop.ACTION>) + unmarshaller.unmarshal((ElementNSImpl) wfAction.getAny()); + } catch (JAXBException e) { + throw new RuntimeException("Unable to unmarshall sqoop action.", e); + } + } + + public static void marshalSqoopAction(org.apache.falcon.oozie.workflow.ACTION wfAction, + JAXBElement<org.apache.falcon.oozie.sqoop.ACTION> actionjaxbElement) { + try { + DOMResult hiveActionDOM = new DOMResult(); + Marshaller marshaller = SQOOP_ACTION_JAXB_CONTEXT.createMarshaller(); + marshaller.marshal(actionjaxbElement, hiveActionDOM); + wfAction.setAny(((Document) hiveActionDOM.getNode()).getDocumentElement()); + } catch (JAXBException e) { + throw new RuntimeException("Unable to marshall sqoop action.", e); + } + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/webapp/src/test/java/org/apache/falcon/lifecycle/FeedExportIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/FeedExportIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/FeedExportIT.java new file mode 100644 index 0000000..194f4c7 --- /dev/null +++ b/webapp/src/test/java/org/apache/falcon/lifecycle/FeedExportIT.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.lifecycle; + +import junit.framework.Assert; +import org.apache.commons.io.FileUtils; +import org.apache.falcon.entity.CatalogStorage; +import org.apache.falcon.resource.TestContext; +import org.apache.falcon.security.CurrentUser; +import org.apache.falcon.util.HiveTestUtils; +import org.apache.falcon.util.HsqldbTestUtils; +import org.apache.hive.hcatalog.api.HCatClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Integration test for Feed Export. + */ + +@Test +public class FeedExportIT { + public static final Logger LOG = LoggerFactory.getLogger(FeedExportIT.class); + + private static final String DATASOURCE_NAME_KEY = "datasourcename"; + private static final String METASTORE_URL = "thrift://localhost:49083"; + private static final String DATABASE_NAME = "SqoopTestDB"; + private static final String TABLE_NAME = "SqoopTestTable"; + + private HCatClient client; + private CatalogStorage storage; + + @BeforeClass + public void setUp() throws Exception { + HsqldbTestUtils.start(); + HsqldbTestUtils.createSqoopUser("sqoop_user", "sqoop"); + HsqldbTestUtils.changeSAPassword("sqoop"); + HsqldbTestUtils.createAndPopulateCustomerTable(); + + TestContext.cleanupStore(); + TestContext.prepare(); + + // setup hcat + CurrentUser.authenticate(TestContext.REMOTE_USER); + client = TestContext.getHCatClient(METASTORE_URL); + + HiveTestUtils.createDatabase(METASTORE_URL, DATABASE_NAME); + List<String> partitionKeys = new ArrayList<>(); + partitionKeys.add("year"); + partitionKeys.add("month"); + partitionKeys.add("day"); + partitionKeys.add("hour"); + HiveTestUtils.createTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionKeys); + } + + @AfterClass + public void tearDown() throws Exception { + HsqldbTestUtils.tearDown(); + FileUtils.deleteDirectory(new File("../localhost/")); + FileUtils.deleteDirectory(new File("localhost")); + } + + @Test + public void testFeedExportHSql() throws Exception { + Assert.assertEquals(4, HsqldbTestUtils.getNumberOfRows()); + } + + @Test + public void testSqoopExport() throws Exception { + TestContext context = new TestContext(); + Map<String, String> overlay = context.getUniqueOverlay(); + String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay); + context.setCluster(filePath); + LOG.info("entity -submit -type cluster -file " + filePath); + Assert.assertEquals(TestContext.executeWithURL("entity -submit -type cluster -file " + filePath), 0); + + // Make a new datasource name into the overlay so that DATASOURCE_TEMPLATE1 and FEED_TEMPLATE3 + // are populated with the same datasource name + String dsName = "datasource-test-1"; + overlay.put(DATASOURCE_NAME_KEY, dsName); + filePath = TestContext.overlayParametersOverTemplate(TestContext.DATASOURCE_TEMPLATE1, overlay); + LOG.info("Submit datatsource entity {} via entity -submit -type datasource -file {}", dsName, filePath); + Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0); + + filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_EXPORT_TEMPLATE6, overlay); + LOG.info("Submit export feed with datasource {} via entity -submitAndSchedule -type feed -file {}", dsName, + filePath); + Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath)); + } +} + + http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java index c34bcfc..2efe4bb 100644 --- a/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java +++ b/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java @@ -22,13 +22,17 @@ import junit.framework.Assert; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.falcon.cluster.util.EmbeddedCluster; +import org.apache.falcon.entity.CatalogStorage; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.resource.TestContext; +import org.apache.falcon.security.CurrentUser; +import org.apache.falcon.util.HiveTestUtils; import org.apache.falcon.util.HsqldbTestUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hive.hcatalog.api.HCatClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterClass; @@ -37,6 +41,8 @@ import org.testng.annotations.Test; import java.io.File; import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; import java.util.Map; /** @@ -48,6 +54,12 @@ public class FeedImportIT { public static final Logger LOG = LoggerFactory.getLogger(FeedImportIT.class); private static final String DATASOURCE_NAME_KEY = "datasourcename"; + private static final String METASTORE_URL = "thrift://localhost:49083"; + private static final String DATABASE_NAME = "SqoopTestDB"; + private static final String TABLE_NAME = "SqoopTestTable"; + + private HCatClient client; + private CatalogStorage storage; @BeforeClass public void setUp() throws Exception { @@ -58,6 +70,18 @@ public class FeedImportIT { TestContext.cleanupStore(); TestContext.prepare(); + + // setup hcat + CurrentUser.authenticate(TestContext.REMOTE_USER); + client = TestContext.getHCatClient(METASTORE_URL); + + HiveTestUtils.createDatabase(METASTORE_URL, DATABASE_NAME); + List<String> partitionKeys = new ArrayList<>(); + partitionKeys.add("year"); + partitionKeys.add("month"); + partitionKeys.add("day"); + partitionKeys.add("hour"); + HiveTestUtils.createTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionKeys); } @AfterClass @@ -154,7 +178,8 @@ public class FeedImportIT { Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0); filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE3, overlay); - LOG.info("Submit FEED entity with datasource {} via entity -submit -type feed -file {}", dsName, filePath); + LOG.info("Submit import FEED entity with datasource {} via entity -submit -type feed -file {}", + dsName, filePath); Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type feed -file " + filePath)); } @@ -200,7 +225,8 @@ public class FeedImportIT { Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0); filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE3, overlay); - LOG.info("Submit FEED entity with datasource {} via entity -submit -type feed -file {}", dsName, filePath); + LOG.info("Submit import FEED entity with datasource {} via entity -submit -type feed -file {}", + dsName, filePath); Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type feed -file " + filePath)); } @@ -222,7 +248,31 @@ public class FeedImportIT { Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0); filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE3, overlay); - LOG.info("Submit feed with datasource {} via entity -submitAndSchedule -type feed -file {}", dsName, filePath); + LOG.info("Submit import feed with datasource {} via entity -submitAndSchedule -type feed -file {}", dsName, + filePath); + Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath)); + } + + @Test + public void testSqoopHCatImport() throws Exception { + TestContext context = new TestContext(); + Map<String, String> overlay = context.getUniqueOverlay(); + String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay); + context.setCluster(filePath); + LOG.info("entity -submit -type cluster -file " + filePath); + Assert.assertEquals(TestContext.executeWithURL("entity -submit -type cluster -file " + filePath), 0); + + // Make a new datasource name into the overlay so that DATASOURCE_TEMPLATE1 and FEED_TEMPLATE3 + // are populated with the same datasource name + String dsName = "datasource-test-1"; + overlay.put(DATASOURCE_NAME_KEY, dsName); + filePath = TestContext.overlayParametersOverTemplate(TestContext.DATASOURCE_TEMPLATE1, overlay); + LOG.info("Submit datatsource entity {} via entity -submit -type datasource -file {}", dsName, filePath); + Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0); + + filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE5, overlay); + LOG.info("Submit import feed with datasource {} via entity -submitAndSchedule -type feed -file {}", dsName, + filePath); Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath)); } http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/webapp/src/test/java/org/apache/falcon/resource/AbstractTestContext.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/AbstractTestContext.java b/webapp/src/test/java/org/apache/falcon/resource/AbstractTestContext.java index 413dfde..ed27306 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/AbstractTestContext.java +++ b/webapp/src/test/java/org/apache/falcon/resource/AbstractTestContext.java @@ -33,7 +33,8 @@ public abstract class AbstractTestContext { public static final String FEED_TEMPLATE1 = "/feed-template1.xml"; public static final String FEED_TEMPLATE2 = "/feed-template2.xml"; public static final String FEED_TEMPLATE3 = "/feed-template3.xml"; - + public static final String FEED_TEMPLATE5 = "/feed-template5.xml"; + public static final String FEED_EXPORT_TEMPLATE6 = "/feed-export-template6.xml"; public static final String PROCESS_TEMPLATE = "/process-template.xml"; protected static void mkdir(FileSystem fileSystem, Path path) throws Exception { http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/webapp/src/test/resources/feed-export-template6.xml ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/feed-export-template6.xml b/webapp/src/test/resources/feed-export-template6.xml new file mode 100644 index 0000000..0eb748b --- /dev/null +++ b/webapp/src/test/resources/feed-export-template6.xml @@ -0,0 +1,56 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<feed description="Customer table from RDB" name="##inputFeedName##" xmlns="uri:falcon:feed:0.1"> + <groups>input</groups> + + <frequency>hours(1)</frequency> + <timezone>UTC</timezone> + <late-arrival cut-off="hours(6)"/> + + <clusters> + <cluster name="##cluster##" type="source"> + <validity start="2010-01-01T00:00Z" end="2020-04-21T00:00Z"/> + <retention limit="hours(24)" action="delete"/> + <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE --> + <export> + <target name="##datasourcename##" tableName="simple_export"> + <load type="allowinsert"/> + <fields> + <includes> + <field>id</field> + <field>name</field> + </includes> + </fields> + </target> + <arguments> + <argument name="--update-key" value="id"/> + </arguments> + </export> + </cluster> + </clusters> + + <locations> + <location type="data" path="/falcon/test/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/"/> + <location type="stats" path="/projects/falcon/clicksStats"/> + <location type="meta" path="/projects/falcon/clicksMetaData"/> + </locations> + + <ACL owner="##user##" group="group" permission="0x755"/> + <schema location="/schema/clicks" provider="protobuf"/> +</feed> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/webapp/src/test/resources/feed-template5.xml ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/feed-template5.xml b/webapp/src/test/resources/feed-template5.xml new file mode 100644 index 0000000..150ce87 --- /dev/null +++ b/webapp/src/test/resources/feed-template5.xml @@ -0,0 +1,55 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<feed description="Customer table from RDB" name="##inputFeedName##" xmlns="uri:falcon:feed:0.1"> + <groups>input</groups> + + <frequency>hours(1)</frequency> + <timezone>UTC</timezone> + <late-arrival cut-off="hours(6)"/> + + <clusters> + <cluster name="##cluster##" type="source"> + <validity start="2010-01-01T00:00Z" end="2020-04-21T00:00Z"/> + <retention limit="hours(24)" action="delete"/> + <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE --> + <import> + <source name="##datasourcename##" tableName="simple"> + <extract type="full"> + <mergepolicy>snapshot</mergepolicy> + </extract> + <fields> + <includes> + <field>id</field> + <field>name</field> + </includes> + </fields> + </source> + <arguments> + <argument name="--split-by" value="id"/> + <argument name="--num-mappers" value="2"/> + </arguments> + </import> + </cluster> + </clusters> + + <table uri="catalog:SqoopTestDB:SqoopTestTable#year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR}"/> + + <ACL owner="##user##" group="group" permission="0x755"/> + <schema location="/schema/clicks" provider="protobuf"/> +</feed> \ No newline at end of file
