Repository: falcon Updated Branches: refs/heads/master 35006fe32 -> 89040a296
http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/resources/action/feed/import-sqoop-database-action.xml ---------------------------------------------------------------------- diff --git a/oozie/src/main/resources/action/feed/import-sqoop-database-action.xml b/oozie/src/main/resources/action/feed/import-sqoop-database-action.xml new file mode 100644 index 0000000..34424c9 --- /dev/null +++ b/oozie/src/main/resources/action/feed/import-sqoop-database-action.xml @@ -0,0 +1,47 @@ +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * --> + +<action name="db-import-sqoop" xmlns='uri:oozie:workflow:0.3'> + <sqoop xmlns="uri:oozie:sqoop-action:0.3"> + <job-tracker>${jobTracker}</job-tracker> + <name-node>${nameNode}</name-node> + <configuration> + <property> + <name>mapred.job.queue.name</name> + <value>${queueName}</value> + </property> + <property> + <name>oozie.launcher.mapred.job.priority</name> + <value>${jobPriority}</value> + </property> + <property> + <name>mapred.compress.map.output</name> + <value>true</value> + </property> + <!-- Assuming the connectors are in oozie share lib --> + <property> + <!-- Will enable using sharelib --> + <name>oozie.use.system.libpath</name> + <value>true</value> + </property> + </configuration> + <command>${sqoopCommand}</command> + </sqoop> + <ok to="end"/> + <error to="fail"/> +</action> http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/resources/action/post-process.xml ---------------------------------------------------------------------- diff --git a/oozie/src/main/resources/action/post-process.xml b/oozie/src/main/resources/action/post-process.xml index df0d286..fa5d804 100644 --- a/oozie/src/main/resources/action/post-process.xml +++ b/oozie/src/main/resources/action/post-process.xml @@ -88,6 +88,8 @@ <arg>${falconInputFeeds}</arg> <arg>-falconInPaths</arg> <arg>${falconInPaths}</arg> + <arg>-datasource</arg> + <arg>${datasource == 'NA' ? 'IGNORE' : datasource}</arg> </java> <ok to="end"/> <error to="fail"/> http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 87c55e3..6f2c480 100644 --- a/pom.xml +++ b/pom.xml @@ -307,6 +307,9 @@ <exclude>**/maven-eclipse.xml</exclude> <exclude>**/.externalToolBuilders/**</exclude> <exclude>html5-ui/**</exclude> + <exclude>**/db1.log</exclude> + <exclude>**/db1.properties</exclude> + <exclude>**/db1.script</exclude> </excludes> </configuration> <executions> http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/webapp/pom.xml ---------------------------------------------------------------------- diff --git a/webapp/pom.xml b/webapp/pom.xml index 9e4dc8f..0999c36 100644 --- a/webapp/pom.xml +++ b/webapp/pom.xml @@ -568,6 +568,9 @@ <exclude>**/data.txt</exclude> <exclude>**/maven-eclipse.xml</exclude> <exclude>**/.externalToolBuilders/**</exclude> + <exclude>**/db1.log</exclude> + <exclude>**/db1.properties</exclude> + <exclude>**/db1.script</exclude> </excludes> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java new file mode 100644 index 0000000..8cc1273 --- /dev/null +++ b/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.lifecycle; + +import junit.framework.Assert; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.falcon.resource.TestContext; +import org.apache.falcon.util.HsqldbTestUtils; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import java.util.Map; + +/** + * Integration test for Feed Import. + */ + +@Test +public class FeedImportIT { + public static final Log LOG = LogFactory.getLog(HsqldbTestUtils.class.getName()); + + @BeforeClass + public void setUp() throws Exception { + HsqldbTestUtils.start(); + HsqldbTestUtils.changeSAPassword("sqoop"); + HsqldbTestUtils.createAndPopulateCustomerTable(); + + TestContext.cleanupStore(); + TestContext.prepare(); + } + + @AfterClass + public void tearDown() throws Exception { + HsqldbTestUtils.tearDown(); + } + + @Test + public void testFeedImportHSql() throws Exception { + Assert.assertEquals(4, HsqldbTestUtils.getNumberOfRows()); + } + + @Test + public void testSqoopImport() throws Exception { + TestContext context = new TestContext(); + Map<String, String> overlay = context.getUniqueOverlay(); + String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay); + context.setCluster(filePath); + LOG.info("entity -submit -type cluster -file " + filePath); + Assert.assertEquals(TestContext.executeWithURL("entity -submit -type cluster -file " + filePath), 0); + + filePath = TestContext.overlayParametersOverTemplate(TestContext.DATASOURCE_TEMPLATE, overlay); + LOG.info("entity -submit -type datasource -file " + filePath); + Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0); + + filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE3, overlay); + LOG.info("entity -submitAndSchedule -type feed -file " + filePath); + Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + + filePath)); + } + + @Test + public void testSqoopImportDeleteDatasource() throws Exception { + TestContext context = new TestContext(); + Map<String, String> overlay = context.getUniqueOverlay(); + String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay); + context.setCluster(filePath); + LOG.info("entity -submit -type cluster -file " + filePath); + Assert.assertEquals(TestContext.executeWithURL("entity -submit -type cluster -file " + filePath), 0); + + filePath = TestContext.overlayParametersOverTemplate(TestContext.DATASOURCE_TEMPLATE, overlay); + LOG.info("entity -submit -type datasource -file " + filePath); + Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0); + + filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE3, overlay); + LOG.info("entity -submit -type feed -file " + filePath); + Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type feed -file " + + filePath)); + + LOG.info("entity -delete -type datasource -name datasource-test"); + Assert.assertEquals(-1, TestContext.executeWithURL("entity -delete -type datasource -name datasource-test")); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/webapp/src/test/java/org/apache/falcon/resource/TestContext.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java index d067dee..0697b3d 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java +++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java @@ -90,6 +90,9 @@ import java.util.regex.Pattern; public class TestContext { public static final String FEED_TEMPLATE1 = "/feed-template1.xml"; public static final String FEED_TEMPLATE2 = "/feed-template2.xml"; + public static final String FEED_TEMPLATE3 = "/feed-template3.xml"; + + public static final String DATASOURCE_TEMPLATE = "/datasource-template.xml"; public static final String CLUSTER_TEMPLATE = "/cluster-template.xml"; http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/webapp/src/test/java/org/apache/falcon/util/HsqldbTestUtils.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/util/HsqldbTestUtils.java b/webapp/src/test/java/org/apache/falcon/util/HsqldbTestUtils.java new file mode 100644 index 0000000..a92629f --- /dev/null +++ b/webapp/src/test/java/org/apache/falcon/util/HsqldbTestUtils.java @@ -0,0 +1,263 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.util; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.sql.SQLException; + +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.hsqldb.Server; + +/** + * Create a simple hsqldb server and schema to use for testing. + */ +public final class HsqldbTestUtils { + + public static final Log LOG = LogFactory.getLog(HsqldbTestUtils.class.getName()); + + // singleton server instance. + private static Server server; + + private static final String IN_MEM = "mem:/"; + + private static boolean inMemoryDB = IN_MEM.equals(getServerHost()); + + private HsqldbTestUtils() {} + + public static String getServerHost() { + String host = System.getProperty("hsql.server.host", IN_MEM); + host = "localhost"; + if (!host.endsWith("/")) { host += "/"; } + return host; + } + + // Database name can be altered too + private static final String DATABASE_NAME = System.getProperty("hsql.database.name", "db1"); + private static final String CUSTOMER_TABLE_NAME = "CUSTOMER"; + private static final String DB_URL = "jdbc:hsqldb:" + getServerHost() + DATABASE_NAME; + private static final String DRIVER_CLASS = "org.hsqldb.jdbcDriver"; + + public static String getUrl() { + return DB_URL; + } + + public static String getDatabaseName() { + return DATABASE_NAME; + } + + /** + * start the server. + */ + public static void start() { + if (null == server) { + LOG.info("Starting new hsqldb server; database=" + DATABASE_NAME); + String tmpDir = System.getProperty("test.build.data", "/tmp/"); + String dbLocation = tmpDir + "/falcon/testdb.file"; + if (inMemoryDB) {dbLocation = IN_MEM; } + server = new Server(); + server.setDatabaseName(0, DATABASE_NAME); + server.putPropertiesFromString("database.0=" + dbLocation + + ";no_system_exit=true;"); + server.start(); + LOG.info("Started server with url=" + DB_URL); + } + } + + public static void stop() { + if (null != server) { + server.stop(); + } + } + + public static void tearDown() throws SQLException { + dropExistingSchema(); + stop(); + } + + public static void changeSAPassword(String passwd) throws Exception { + Connection connection = null; + Statement st = null; + + LOG.info("Changing password for SA"); + try { + connection = getConnectionSystem(); + + st = connection.createStatement(); + st.executeUpdate("SET PASSWORD \"" + passwd + "\""); + connection.commit(); + } finally { + if (null != st) { + st.close(); + } + + if (null != connection) { + connection.close(); + } + } + } + private static Connection getConnectionSystem() throws SQLException { + return getConnection("SA", ""); + } + + private static Connection getConnection() throws SQLException { + return getConnection("SA", "sqoop"); + } + private static Connection getConnection(String user, String password) throws SQLException { + try { + Class.forName(DRIVER_CLASS); + } catch (ClassNotFoundException cnfe) { + LOG.error("Could not get connection; driver class not found: " + + DRIVER_CLASS); + return null; + } + Connection connection = DriverManager.getConnection(DB_URL, user, password); + connection.setAutoCommit(false); + return connection; + } + + /** + * Returns database URL for the server instance. + * @return String representation of DB_URL + */ + public static String getDbUrl() { + return DB_URL; + } + + public static int getNumberOfRows() throws SQLException { + Connection connection = null; + Statement st = null; + try { + connection = getConnection(); + + st = connection.createStatement(); + ResultSet rs = st.executeQuery("SELECT COUNT(*) FROM " + CUSTOMER_TABLE_NAME); + int rowCount = 0; + if (rs.next()) { + rowCount = rs.getInt(1); + } + return rowCount; + } finally { + if (null != st) { + st.close(); + } + + if (null != connection) { + connection.close(); + } + } + } + + public static void createAndPopulateCustomerTable() throws SQLException, ClassNotFoundException { + + LOG.info("createAndPopulateCustomerTable"); + Connection connection = null; + Statement st = null; + try { + connection = getConnection(); + + st = connection.createStatement(); + st.executeUpdate("DROP TABLE " + CUSTOMER_TABLE_NAME + " IF EXISTS"); + st.executeUpdate("CREATE TABLE " + CUSTOMER_TABLE_NAME + "(id INT NOT NULL PRIMARY KEY, name VARCHAR(64))"); + + st.executeUpdate("INSERT INTO " + CUSTOMER_TABLE_NAME + " VALUES(1, 'Apple')"); + st.executeUpdate("INSERT INTO " + CUSTOMER_TABLE_NAME + " VALUES(2, 'Blackberry')"); + st.executeUpdate("INSERT INTO " + CUSTOMER_TABLE_NAME + " VALUES(3, 'Caterpillar')"); + st.executeUpdate("INSERT INTO " + CUSTOMER_TABLE_NAME + " VALUES(4, 'DuPont')"); + + connection.commit(); + } finally { + if (null != st) { + st.close(); + } + + if (null != connection) { + connection.close(); + } + } + } + + /** + * Delete any existing tables. + */ + public static void dropExistingSchema() throws SQLException { + String [] tables = listTables(); + if (null != tables) { + Connection conn = getConnection(); + for (String table : tables) { + Statement s = conn.createStatement(); + try { + s.executeUpdate("DROP TABLE " + table); + conn.commit(); + } finally { + s.close(); + } + } + } + } + + public static String[] listTables() { + ResultSet results = null; + String [] tableTypes = {"TABLE"}; + try { + try { + DatabaseMetaData metaData = getConnection().getMetaData(); + results = metaData.getTables(null, null, null, tableTypes); + } catch (SQLException sqlException) { + LOG.error("Error reading database metadata: " + + sqlException.toString(), sqlException); + return null; + } + + if (null == results) { + return null; + } + + try { + ArrayList<String> tables = new ArrayList<String>(); + while (results.next()) { + String tableName = results.getString("TABLE_NAME"); + tables.add(tableName); + } + + return tables.toArray(new String[0]); + } catch (SQLException sqlException) { + LOG.error("Error reading from database: " + + sqlException.toString(), sqlException); + return null; + } + } finally { + if (null != results) { + try { + results.close(); + getConnection().commit(); + } catch (SQLException sqlE) { + LOG.error("Exception closing ResultSet: " + + sqlE.toString(), sqlE); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/webapp/src/test/resources/datasource-template.xml ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/datasource-template.xml b/webapp/src/test/resources/datasource-template.xml new file mode 100644 index 0000000..fb7a329 --- /dev/null +++ b/webapp/src/test/resources/datasource-template.xml @@ -0,0 +1,46 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<datasource colo="##colo##" description="" type="hsql" name="datasource-test" xmlns="uri:falcon:datasource:0.1"> + <interfaces> + <interface type="readonly" endpoint="jdbc:hsqldb:localhost/db1"> + <credential type="password-text"> + <userName>SA</userName> + <passwordText></passwordText> + </credential> + </interface> + + <interface type="write" endpoint="jdbc:hsqldb:localhost/db1"> + <credential type="password-text"> + <userName>SA</userName> + <passwordText>sqoop</passwordText> + </credential> + </interface> + + <credential type="password-text"> + <userName>SA</userName> + <passwordText>sqoop</passwordText> + </credential> + </interfaces> + + <driver> + <clazz>org.hsqldb.jdbcDriver</clazz> + <jar>/user/oozie/share/lib/lib_20150721010816/sqoop/hsqldb-1.8.0.7.jar</jar> + </driver> +</datasource> http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/webapp/src/test/resources/feed-template3.xml ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/feed-template3.xml b/webapp/src/test/resources/feed-template3.xml new file mode 100644 index 0000000..a6c1d6b --- /dev/null +++ b/webapp/src/test/resources/feed-template3.xml @@ -0,0 +1,59 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<feed description="Customer table from RDB" name="##inputFeedName##" xmlns="uri:falcon:feed:0.1"> + <groups>input</groups> + + <frequency>hours(1)</frequency> + <timezone>UTC</timezone> + <late-arrival cut-off="hours(6)"/> + + <clusters> + <cluster name="##cluster##" type="source"> + <validity start="2010-01-01T00:00Z" end="2020-04-21T00:00Z"/> + <retention limit="hours(24)" action="delete"/> + <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE --> + <import> + <source name="datasource-test" tableName="simple"> + <extract type="full"> + <mergepolicy>snapshot</mergepolicy> + </extract> + <fields> + <includes> + <field>id</field> + <field>name</field> + </includes> + </fields> + </source> + <arguments> + <argument name="--split-by" value="id"/> + <argument name="--num-mappers" value="2"/> + </arguments> + </import> + </cluster> + </clusters> + + <locations> + <location type="data" path="/falcon/test/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/"/> + <location type="stats" path="/projects/falcon/clicksStats"/> + <location type="meta" path="/projects/falcon/clicksMetaData"/> + </locations> + + <ACL owner="##user##" group="group" permission="0x755"/> + <schema location="/schema/clicks" provider="protobuf"/> +</feed>
