Repository: falcon Updated Branches: refs/heads/master 40d943637 -> 752511bbe
FALCON-2093 Support TD connector for Database Import and Export Author: Venkatesan Ramachandran <[email protected]> Reviewers: "Ying Zheng <[email protected]>, Peeyush B <[email protected]>" Closes #236 from vramachan/FALCON-2093.TdConnector Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/752511bb Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/752511bb Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/752511bb Branch: refs/heads/master Commit: 752511bbe4da85f96e67a0520fb7560e7434b580 Parents: 40d9436 Author: Venkatesan Ramachandran <[email protected]> Authored: Thu Jul 21 11:42:42 2016 -0700 Committer: bvellanki <[email protected]> Committed: Thu Jul 21 11:42:42 2016 -0700 ---------------------------------------------------------------------- .../apache/falcon/entity/DatasourceHelper.java | 2 +- .../importexport/td-connector-datasource.xml | 62 +++++++++++++++++ .../td-connector-feed-export-filesystem.xml | 60 +++++++++++++++++ .../td-connector-feed-import-filesystem.xml | 70 ++++++++++++++++++++ .../oozie/DatabaseExportWorkflowBuilder.java | 1 + .../oozie/DatabaseImportWorkflowBuilder.java | 1 + .../apache/falcon/oozie/ImportExportCommon.java | 8 +++ 7 files changed, 203 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/752511bb/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java b/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java index e035bb5..1479133 100644 --- a/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java @@ -412,7 +412,7 @@ public final class DatasourceHelper { returns data store properties */ - private static Map<String, String> getDatasourceProperties(final Datasource datasource) { + public static Map<String, String> getDatasourceProperties(final Datasource datasource) { Map<String, String> returnProps = new HashMap<String, String>(); if (datasource.getProperties() != null) { for (Property prop : datasource.getProperties().getProperties()) { http://git-wip-us.apache.org/repos/asf/falcon/blob/752511bb/examples/entity/importexport/td-connector-datasource.xml ---------------------------------------------------------------------- diff --git a/examples/entity/importexport/td-connector-datasource.xml b/examples/entity/importexport/td-connector-datasource.xml new file mode 100755 index 0000000..c1c14be --- /dev/null +++ b/examples/entity/importexport/td-connector-datasource.xml @@ -0,0 +1,62 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<datasource colo="west-coast" description="Teradata database on west coast" type="teradata" name="td-db-connector" xmlns="uri:falcon:datasource:0.1"> + <tags>[email protected], [email protected]</tags> + <interfaces> + + <!-- ***** read interface ***** --> + <interface type="readonly" endpoint="jdbc:teradata://10.10.20.13/DATABASE=retail"> + <credential type="password-text"> + <userName>dbc</userName> + <passwordText>dbc</passwordText> + </credential> + </interface> + + <!-- ***** write interface ***** --> + <interface type="write" endpoint="jdbc:teradata://10.10.20.13/DATABASE=retail"> + <credential type="password-text"> + <userName>dbc</userName> + <passwordText>dbc</passwordText> + </credential> + </interface> + + <!-- ***** default credential ***** --> + <credential type="password-alias"> + <userName>dbc</userName> + <passwordAlias> + <alias>sqoop.password.alias</alias> + <providerPath>hdfs://c6401.ambari.apache.org:8020/user/ambari-qa/sqoop_password.jceks</providerPath> + </passwordAlias> + </credential> + + </interfaces> + + <driver> + <clazz>com.teradata.jdbc.TeraDriver</clazz> + <jar>/user/oozie/share/lib/lib_20160628052542/sqoop/terajdbc4.jar</jar> + <jar>/user/oozie/share/lib/lib_20160628052542/sqoop/tdgssconfig.jar</jar> + </driver> + + <properties> + <property name="connection-manager" value="org.apache.sqoop.teradata.TeradataConnManager"/> + </properties> +</datasource> + http://git-wip-us.apache.org/repos/asf/falcon/blob/752511bb/examples/entity/importexport/td-connector-feed-export-filesystem.xml ---------------------------------------------------------------------- diff --git a/examples/entity/importexport/td-connector-feed-export-filesystem.xml b/examples/entity/importexport/td-connector-feed-export-filesystem.xml new file mode 100755 index 0000000..11bebfe --- /dev/null +++ b/examples/entity/importexport/td-connector-feed-export-filesystem.xml @@ -0,0 +1,60 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<!-- + A feed representing Hourly customer email data retained for 90 days + --> +<feed description="Raw customer email feed" name="td-connector-feed-export-filesystem" xmlns="uri:falcon:feed:0.1"> + <tags>externalSystem=USWestEmailServers,classification=secure</tags> + <groups>DataImportPipeline</groups> + <frequency>minutes(10)</frequency> + <late-arrival cut-off="hours(4)"/> + <clusters> + <cluster name="primaryCluster" type="source"> + <validity start="2016-04-10T00:00Z" end="2017-03-31T00:00Z"/> + <retention limit="days(90)" action="delete"/> + <export> + <target name="td-db-connector" tableName="db_export_fs"> + <load type="allowinsert"/> + </target> + </export> + </cluster> + </clusters> + + <locations> + <location type="data" path="/user/ambari-qa/falcon/demo/primary/td-connector-feed-import-filesystem/${YEAR}-${MONTH}-${DAY}-${HOUR}-${MINUTE}"/> + <location type="stats" path="/none"/> + <location type="meta" path="/none"/> + </locations> + + <ACL owner="ambari-qa" group="users" permission="0755"/> + <schema location="/none" provider="none"/> + + <lifecycle> + <retention-stage> + <frequency>days(1)</frequency> + <queue>default</queue> + <priority>LOW</priority> + <properties> + <property name="retention.policy.agebaseddelete.limit" value="hours(12)"></property> + </properties> + </retention-stage> + </lifecycle> +</feed> http://git-wip-us.apache.org/repos/asf/falcon/blob/752511bb/examples/entity/importexport/td-connector-feed-import-filesystem.xml ---------------------------------------------------------------------- diff --git a/examples/entity/importexport/td-connector-feed-import-filesystem.xml b/examples/entity/importexport/td-connector-feed-import-filesystem.xml new file mode 100755 index 0000000..899c45d --- /dev/null +++ b/examples/entity/importexport/td-connector-feed-import-filesystem.xml @@ -0,0 +1,70 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!-- + A feed representing Hourly customer email data retained for 90 days + --> +<feed description="Raw customer email feed" name="td-connector-feed-import-filesystem" xmlns="uri:falcon:feed:0.1"> + <tags>externalSystem=USWestEmailServers,classification=secure</tags> + <groups>DataImportPipeline</groups> + <frequency>minutes(10)</frequency> + <late-arrival cut-off="hours(4)"/> + <clusters> + <cluster name="primaryCluster" type="source"> + <validity start="2016-04-10T00:00Z" end="2017-03-31T00:00Z"/> + <retention limit="days(90)" action="delete"/> + <import> + <source name="td-db-connector" tableName="db_raw_data"> + <extract type="full"> + <mergepolicy>snapshot</mergepolicy> + </extract> + <fields> + <includes> + <field>id</field> + <field>name</field> + </includes> + </fields> + </source> + <arguments> + <argument name="--split-by" value="id"/> + <argument name="--num-mappers" value="2"/> + </arguments> + </import> + </cluster> + </clusters> + + <locations> + <location type="data" path="/user/ambari-qa/falcon/demo/primary/td-connector-feed-import-filesystem/${YEAR}-${MONTH}-${DAY}-${HOUR}-${MINUTE}"/> + <location type="stats" path="/none"/> + <location type="meta" path="/none"/> + </locations> + + <ACL owner="ambari-qa" group="users" permission="0755"/> + <schema location="/none" provider="none"/> + + <lifecycle> + <retention-stage> + <frequency>days(1)</frequency> + <queue>default</queue> + <priority>LOW</priority> + <properties> + <property name="retention.policy.agebaseddelete.limit" value="hours(12)"></property> + </properties> + </retention-stage> + </lifecycle> +</feed> http://git-wip-us.apache.org/repos/asf/falcon/blob/752511bb/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java index 93f3d1f..6468415 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java @@ -96,6 +96,7 @@ public class DatabaseExportWorkflowBuilder extends ExportWorkflowBuilder { org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName()); Datasource datasource = DatasourceHelper.getDatasource(FeedHelper.getExportDatasourceName(feedCluster)); + ImportExportCommon.buildConnectionManagerArg(sqoopArgs, datasource); ImportExportCommon.buildDriverArgs(sqoopArgs, datasource).append(ImportExportCommon.ARG_SEPARATOR); ImportExportCommon.buildConnectArg(sqoopArgs, DatasourceHelper.getWriteEndpoint(datasource)) .append(ImportExportCommon.ARG_SEPARATOR); http://git-wip-us.apache.org/repos/asf/falcon/blob/752511bb/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java index 77d5462..44562f2 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java @@ -98,6 +98,7 @@ public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder { org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName()); Datasource datasource = DatasourceHelper.getDatasource(FeedHelper.getImportDatasourceName(feedCluster)); + ImportExportCommon.buildConnectionManagerArg(sqoopArgs, datasource); ImportExportCommon.buildDriverArgs(sqoopArgs, datasource).append(ImportExportCommon.ARG_SEPARATOR); ImportExportCommon.buildConnectArg(sqoopArgs, DatasourceHelper.getReadOnlyEndpoint(datasource)) .append(ImportExportCommon.ARG_SEPARATOR); http://git-wip-us.apache.org/repos/asf/falcon/blob/752511bb/oozie/src/main/java/org/apache/falcon/oozie/ImportExportCommon.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/ImportExportCommon.java b/oozie/src/main/java/org/apache/falcon/oozie/ImportExportCommon.java index 3e8ca73..f5f5ccf 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/ImportExportCommon.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/ImportExportCommon.java @@ -129,4 +129,12 @@ public final class ImportExportCommon { return builder.append("--table").append(ImportExportCommon.ARG_SEPARATOR) .append(tableName); } + + public static void buildConnectionManagerArg(StringBuilder sqoopArgs, Datasource datasource) { + Map<String, String> dsProps = DatasourceHelper.getDatasourceProperties(datasource); + if (dsProps.containsKey("connection-manager")) { + sqoopArgs.append("--connection-manager").append(ImportExportCommon.ARG_SEPARATOR) + .append(dsProps.get("connection-manager")).append(ImportExportCommon.ARG_SEPARATOR); + } + } }
