Updated Branches: refs/heads/sqoop2 c8b4581f2 -> d7bd4ad43
SQOOP-945: Integration: Auxiliary methods that will support export (Jarek Jarcec Cecho via Kate Ting) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/d7bd4ad4 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/d7bd4ad4 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/d7bd4ad4 Branch: refs/heads/sqoop2 Commit: d7bd4ad436e2b5487257683decd170eb31757e20 Parents: c8b4581 Author: Kate Ting <[email protected]> Authored: Sun Jun 16 01:48:31 2013 -0400 Committer: Kate Ting <[email protected]> Committed: Sun Jun 16 01:48:31 2013 -0400 ---------------------------------------------------------------------- .../apache/sqoop/test/db/DatabaseProvider.java | 129 ++++++++++++++++++- .../sqoop/integration/TomcatTestCase.java | 13 ++ .../connector/ConnectorTestCase.java | 76 ++++++++++- .../connector/jdbc/generic/TableExportTest.java | 82 ++++++++++++ 4 files changed, 288 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/d7bd4ad4/test/src/main/java/org/apache/sqoop/test/db/DatabaseProvider.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/sqoop/test/db/DatabaseProvider.java b/test/src/main/java/org/apache/sqoop/test/db/DatabaseProvider.java index 72d1a95..e0cc7c9 100644 --- a/test/src/main/java/org/apache/sqoop/test/db/DatabaseProvider.java +++ b/test/src/main/java/org/apache/sqoop/test/db/DatabaseProvider.java @@ -22,6 +22,7 @@ import org.apache.log4j.Logger; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.LinkedList; @@ -195,6 +196,27 @@ abstract public class DatabaseProvider { } /** + * Execute given query in a new statement object and return corresponding + * result set. Caller is responsible for closing both ResultSet and Statement + * object! + * + * @param query Query to execute + * @return Generated ResultSet + */ + public ResultSet executeQuery(String query) { + LOG.info("Executing query: " + query); + Statement stmt = null; + + try { + stmt = connection.createStatement(); + return stmt.executeQuery(query); + } catch (SQLException e) { + LOG.error("Error in executing query", e); + throw new RuntimeException("Error in executing query", e); + } + } + + /** * Create new table. * * @param name Table name @@ -243,13 +265,7 @@ abstract public class DatabaseProvider { List<String> valueList = new LinkedList<String>(); for(Object value : values) { - if(value == null) { - valueList.add(nullConstant()); - } else if(value.getClass() == String.class) { - valueList.add(escapeValueString((String)value)); - } else { - valueList.add(value.toString()); - } + valueList.add(convertObjectToQueryString(value)); } sb.append(StringUtils.join(valueList, ", ")); @@ -259,6 +275,62 @@ abstract public class DatabaseProvider { } /** + * Return rows that match given conditions. + * + * @param tableName Table name + * @param conditions Conditions in form of double values - column name and value, for example: "id", 1 or "last_update_date", null + * @return ResultSet with given criteria + */ + public ResultSet getRows(String tableName, Object []conditions) { + // Columns are in form of two strings - name and value + if(conditions.length % 2 != 0) { + throw new RuntimeException("Incorrect number of parameters."); + } + + StringBuilder sb = new StringBuilder("SELECT * FROM "); + sb.append(escapeTableName(tableName)); + + List<String> conditionList = new LinkedList<String>(); + for(int i = 0; i < conditions.length; i += 2) { + Object columnName = conditions[i]; + Object value = conditions[i + 1]; + + if( !(columnName instanceof String)) { + throw new RuntimeException("Each odd item should be a string with column name."); + } + + if(value == null) { + conditionList.add(escapeColumnName((String) columnName) + " IS NULL"); + } else { + conditionList.add(escapeColumnName((String) columnName) + " = " + convertObjectToQueryString(value)); + } + } + + if(conditionList.size() != 0) { + sb.append(" WHERE ").append(StringUtils.join(conditionList, " AND ")); + } + + return executeQuery(sb.toString()); + } + + /** + * Convert given object to it's representation that can be safely used inside + * query. + * + * @param value Value to convert + * @return Query safe string representation + */ + public String convertObjectToQueryString(Object value) { + if(value == null) { + return nullConstant(); + } else if(value.getClass() == String.class) { + return escapeValueString((String)value); + } else { + return value.toString(); + } + } + + /** * Drop table. * * Any exceptions will be ignored. @@ -277,6 +349,49 @@ abstract public class DatabaseProvider { } /** + * Return number of rows from given table. + * + * @param tableName Table name + * @return Number of rows + */ + public long rowCount(String tableName) { + StringBuilder sb = new StringBuilder("SELECT COUNT(*) FROM "); + sb.append(escapeTableName(tableName)); + + ResultSet rs = null; + try { + rs = executeQuery(sb.toString()); + if(!rs.next()) { + throw new RuntimeException("Row count query did not returned any rows."); + } + + return rs.getLong(1); + } catch (SQLException e) { + LOG.error("Can't get number of rows: ", e); + throw new RuntimeException("Can't get number of rows: ", e); + } finally { + closeResultSetWithStatement(rs); + } + } + + /** + * Close given result set (if not null) and associated statement. + * + * @param rs ResultSet to close. + */ + public void closeResultSetWithStatement(ResultSet rs) { + if(rs != null) { + try { + Statement stmt = rs.getStatement(); + rs.close(); + stmt.close(); + } catch (SQLException e) { + LOG.info("Ignoring exception: ", e); + } + } + } + + /** * Load class. * * @param className Class name http://git-wip-us.apache.org/repos/asf/sqoop/blob/d7bd4ad4/test/src/test/java/org/apache/sqoop/integration/TomcatTestCase.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/TomcatTestCase.java b/test/src/test/java/org/apache/sqoop/integration/TomcatTestCase.java index 6eb3184..fa2c2b4 100644 --- a/test/src/test/java/org/apache/sqoop/integration/TomcatTestCase.java +++ b/test/src/test/java/org/apache/sqoop/integration/TomcatTestCase.java @@ -17,6 +17,7 @@ */ package org.apache.sqoop.integration; +import org.apache.commons.io.FileUtils; import org.apache.log4j.Logger; import org.apache.commons.lang.StringUtils; import org.apache.sqoop.client.SqoopClient; @@ -181,4 +182,16 @@ abstract public class TomcatTestCase { fail("Output do not match expectations."); } } + + /** + * Create mapreduce input file with specified content. + * + * @param filename Input file name + * @param lines Individual lines that should be written into the file + * @throws IOException + */ + protected void createInputMapreduceFile(String filename, String...lines) throws IOException { + File outputFile = new File(getMapreduceDirectory(), filename); + FileUtils.writeLines(outputFile, Arrays.asList(lines)); + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/d7bd4ad4/test/src/test/java/org/apache/sqoop/integration/connector/ConnectorTestCase.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/ConnectorTestCase.java b/test/src/test/java/org/apache/sqoop/integration/connector/ConnectorTestCase.java index cdc3bd2..d4e432d 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/ConnectorTestCase.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/ConnectorTestCase.java @@ -31,8 +31,12 @@ import org.apache.sqoop.validation.Status; import org.junit.AfterClass; import org.junit.BeforeClass; +import java.sql.ResultSet; +import java.sql.SQLException; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.fail; /** * Base test case for connector testing. @@ -74,6 +78,10 @@ abstract public class ConnectorTestCase extends TomcatTestCase { provider.insertRow(getTableName(), values); } + protected long rowCount() { + return provider.rowCount(getTableName()); + } + /** * Fill connection form based on currently active provider. * @@ -103,14 +111,31 @@ abstract public class ConnectorTestCase extends TomcatTestCase { } /** + * Fill input form. Mapreduce input directory will be set to default test value. + * + * @param job MJOb object to fill + */ + protected void fillInputForm(MJob job) { + MFormList forms = job.getFrameworkPart(); + forms.getStringInput("input.inputDirectory").setValue(getMapreduceDirectory()); + } + + /** * Create table cities. */ + protected void createTableCities() { + createTable("id", + "id", "int", + "country", "varchar(50)", + "city", "varchar(50)" + ); + } + + /** + * Create table cities and load few rows. + */ protected void createAndLoadTableCities() { - createTable("id", - "id", "int not null", - "country", "varchar(50)", - "city", "varchar(50)" - ); + createTableCities(); insertRow(1, "USA", "San Francisco"); insertRow(2, "USA", "Sunnyvale"); insertRow(3, "Czech Republic", "Brno"); @@ -118,6 +143,47 @@ abstract public class ConnectorTestCase extends TomcatTestCase { } /** + * Assert row in testing table. + * + * @param conditions Conditions in form that are expected by the database provider + * @param values Values that are expected in the table (with corresponding types) + */ + protected void assertRow(Object []conditions, Object ...values) { + ResultSet rs = provider.getRows(getTableName(), conditions); + + try { + if(! rs.next()) { + fail("No rows found."); + } + + int i = 1; + for(Object expectedValue : values) { + Object actualValue = rs.getObject(i); + assertEquals("Columns do not match on position: " + i, expectedValue, actualValue); + i++; + } + + if(rs.next()) { + fail("Found more than one row."); + } + } catch (SQLException e) { + LOG.error("Unexpected SQLException", e); + fail("Unexpected SQLException: " + e); + } finally { + provider.closeResultSetWithStatement(rs); + } + } + + /** + * Assert row in table "cities". + * + * @param values Values that are expected + */ + protected void assertRowInCitiesTable(Object ... values) { + assertRow(new Object[]{"id", values[0]}, values); + } + + /** * Create connection. * * With asserts to make sure that it was created correctly. http://git-wip-us.apache.org/repos/asf/sqoop/blob/d7bd4ad4/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableExportTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableExportTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableExportTest.java new file mode 100644 index 0000000..b920655 --- /dev/null +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableExportTest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.integration.connector.jdbc.generic; + +import org.apache.log4j.Logger; +import org.apache.sqoop.integration.connector.ConnectorTestCase; +import org.apache.sqoop.model.MConnection; +import org.apache.sqoop.model.MFormList; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MSubmission; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * + */ +public class TableExportTest extends ConnectorTestCase { + + private static final Logger LOG = Logger.getLogger(TableImportTest.class); + + @Test + public void testBasicImport() throws Exception { + createTableCities(); + createInputMapreduceFile("input-0001", + "1,'USA','San Francisco'", + "2,'USA','Sunnyvale'", + "3,'Czech Republic','Brno'", + "4,'USA','Palo Alto'" + ); + + // Connection creation + MConnection connection = getClient().newConnection(1L); + fillConnectionForm(connection); + createConnection(connection); + + // Job creation + MJob job = getClient().newJob(connection.getPersistenceId(), MJob.Type.EXPORT); + + // Connector values + MFormList forms = job.getConnectorPart(); + forms.getStringInput("table.tableName").setValue(provider.escapeTableName(getTableName())); + fillInputForm(job); + createJob(job); + + MSubmission submission = getClient().startSubmission(job.getPersistenceId()); + assertTrue("Unexpected value: " + submission.getStatus(), submission.getStatus().isRunning()); + + // Wait until the job finish - this active waiting will be removed once + // Sqoop client API will get blocking support. + do { + Thread.sleep(5000); + submission = getClient().getSubmissionStatus(job.getPersistenceId()); + } while(submission.getStatus().isRunning()); + + assertEquals(4L, rowCount()); + assertRowInCitiesTable(1, "USA", "San Francisco"); + assertRowInCitiesTable(2, "USA", "Sunnyvale"); + assertRowInCitiesTable(3, "Czech Republic", "Brno"); + assertRowInCitiesTable(4, "USA", "Palo Alto"); + + // Clean up testing table + dropTable(); + } + +}
