Updated Branches: refs/heads/sqoop2 08a829fd6 -> e33d69347
SQOOP-974: Sqoop2: Add staging table support to generic jdbc export job (Raghav Kumar Gautam via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/e33d6934 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/e33d6934 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/e33d6934 Branch: refs/heads/sqoop2 Commit: e33d6934761b66f24a7d197aacc3d40ea00f8155 Parents: 08a829f Author: Jarek Jarcec Cecho <[email protected]> Authored: Wed Sep 11 01:11:54 2013 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Wed Sep 11 01:11:54 2013 -0700 ---------------------------------------------------------------------- .../jdbc/GenericJdbcConnectorError.java | 5 + .../connector/jdbc/GenericJdbcExecutor.java | 75 +++++++++ .../jdbc/GenericJdbcExportDestroyer.java | 28 ++++ .../jdbc/GenericJdbcExportInitializer.java | 29 +++- .../connector/jdbc/GenericJdbcValidator.java | 11 ++ .../jdbc/configuration/ExportTableForm.java | 2 + .../generic-jdbc-connector-resources.properties | 8 + .../connector/jdbc/GenericJdbcExecutorTest.java | 88 ++++++++++ .../connector/jdbc/TestExportInitializer.java | 165 ++++++++++++++++++- .../generic/exports/TableStagedExportTest.java | 77 +++++++++ 10 files changed, 486 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java index 671bb4a..2b1a0ad 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java @@ -73,6 +73,11 @@ public enum GenericJdbcConnectorError implements ErrorCode { /** Can't fetch schema */ GENERIC_JDBC_CONNECTOR_0016("Can't fetch schema"), + /** Neither the table name nor the table sql are specified. */ + GENERIC_JDBC_CONNECTOR_0017("The stage table is not empty."), + + GENERIC_JDBC_CONNECTOR_0018("Error occurred while transferring data from " + + "stage table to destination table."), ; private final String message; http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java index 75cf9d9..9fd2e4f 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java @@ -26,10 +26,14 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; +import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; public class GenericJdbcExecutor { + private static final Logger LOG = + Logger.getLogger(GenericJdbcExecutor.class); + private Connection connection; private PreparedStatement preparedStatement; @@ -69,6 +73,77 @@ public class GenericJdbcExecutor { } } + public void deleteTableData(String tableName) { + LOG.info("Deleting all the rows from: " + tableName); + executeUpdate("DELETE FROM " + tableName); + } + + public void migrateData(String fromTable, String toTable) { + String insertQuery = "INSERT INTO " + toTable + + " ( SELECT * FROM " + fromTable + " )"; + Statement stmt = null; + Boolean oldAutoCommit = null; + try { + final long expectedInsertCount = getTableRowCount(fromTable); + oldAutoCommit = connection.getAutoCommit(); + connection.setAutoCommit(false); + stmt = connection.createStatement(); + final int actualInsertCount = stmt.executeUpdate(insertQuery); + if(expectedInsertCount == actualInsertCount) { + LOG.info("Transferred " + actualInsertCount + " rows of staged data " + + "from: " + fromTable + " to: " + toTable); + connection.commit(); + deleteTableData(fromTable); + connection.commit(); + } else { + LOG.error("Rolling back as number of rows inserted into table: " + + toTable + " was: " + actualInsertCount + " expected: " + + expectedInsertCount); + connection.rollback(); + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0018); + } + } catch(SQLException e) { + LOG.error("Got SQLException while migrating data from: " + fromTable + + " to: " + toTable, e); + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0018, e); + } finally { + if(stmt != null) { + try { + stmt.close(); + } catch(SQLException e) { + LOG.warn("Got SQLException at the time of closing statement.", e); + } + } + if(oldAutoCommit != null) { + try { + connection.setAutoCommit(oldAutoCommit); + } catch(SQLException e) { + LOG.warn("Got SQLException while setting autoCommit mode.", e); + } + } + } + } + + public long getTableRowCount(String tableName) { + ResultSet resultSet = executeQuery("SELECT COUNT(1) FROM " + tableName); + try { + resultSet.next(); + return resultSet.getLong(1); + } catch(SQLException e) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0004, e); + } finally { + try { + if(resultSet != null) + resultSet.close(); + } catch(SQLException e) { + LOG.warn("Got SQLException while closing resultset.", e); + } + } + } + public void executeUpdate(String sql) { try { Statement statement = connection.createStatement( http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java index 588e236..c5faa09 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java @@ -30,5 +30,33 @@ public class GenericJdbcExportDestroyer extends Destroyer<ConnectionConfiguratio @Override public void destroy(DestroyerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) { LOG.info("Running generic JDBC connector destroyer"); + + final String tableName = job.table.tableName; + final String stageTableName = job.table.stageTableName; + final boolean stageEnabled = stageTableName != null && + stageTableName.length() > 0; + if(stageEnabled) { + moveDataToDestinationTable(connection, + context.isSuccess(), stageTableName, tableName); + } } + + private void moveDataToDestinationTable(ConnectionConfiguration connectorConf, + boolean success, String stageTableName, String tableName) { + GenericJdbcExecutor executor = + new GenericJdbcExecutor(connectorConf.connection.jdbcDriver, + connectorConf.connection.connectionString, + connectorConf.connection.username, + connectorConf.connection.password); + + if(success) { + LOG.info("Job completed, transferring data from stage table to " + + "destination table."); + executor.migrateData(stageTableName, tableName); + } else { + LOG.warn("Job failed, clearing stage table."); + executor.deleteTableData(stageTableName); + } + } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java index 7212843..ef39cdc 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java @@ -21,6 +21,7 @@ import java.util.LinkedList; import java.util.List; import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; import org.apache.sqoop.common.MutableContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; @@ -33,6 +34,8 @@ import org.apache.sqoop.utils.ClassUtils; public class GenericJdbcExportInitializer extends Initializer<ConnectionConfiguration, ExportJobConfiguration> { private GenericJdbcExecutor executor; + private static final Logger LOG = + Logger.getLogger(GenericJdbcExportInitializer.class); @Override public void initialize(InitializerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) { @@ -75,6 +78,11 @@ public class GenericJdbcExportInitializer extends Initializer<ConnectionConfigur String schemaName = jobConfig.table.schemaName; String tableName = jobConfig.table.tableName; + String stageTableName = jobConfig.table.stageTableName; + boolean clearStageTable = jobConfig.table.clearStageTable == null ? + false : jobConfig.table.clearStageTable; + final boolean stageEnabled = + stageTableName != null && stageTableName.length() > 0; String tableSql = jobConfig.table.sql; String tableColumns = jobConfig.table.columns; @@ -85,9 +93,28 @@ public class GenericJdbcExportInitializer extends Initializer<ConnectionConfigur } else if (tableName != null) { // when table name is specified: + if(stageEnabled) { + LOG.info("Stage has been enabled."); + LOG.info("Use stageTable: " + stageTableName + + " with clearStageTable: " + clearStageTable); + + if(clearStageTable) { + executor.deleteTableData(stageTableName); + } else { + long stageRowCount = executor.getTableRowCount(stageTableName); + if(stageRowCount > 0) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0017); + } + } + } // For databases that support schemas (IE: postgresql). - String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + final String tableInUse = stageEnabled ? stageTableName : tableName; + String fullTableName = (schemaName == null) ? + executor.delimitIdentifier(tableInUse) : + executor.delimitIdentifier(schemaName) + + "." + executor.delimitIdentifier(tableInUse); if (tableColumns == null) { String[] columns = executor.getQueryColumns("SELECT * FROM " http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java index 4e24517..0c5f6e1 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java @@ -88,6 +88,17 @@ public class GenericJdbcValidator extends Validator { if(configuration.table.tableName != null && configuration.table.sql != null) { validation.addMessage(Status.UNACCEPTABLE, "table", "Both table name and SQL cannot be specified"); } + if(configuration.table.tableName == null && + configuration.table.stageTableName != null) { + validation.addMessage(Status.UNACCEPTABLE, "table", + "Stage table name cannot be specified without specifying table name"); + } + if(configuration.table.stageTableName == null && + configuration.table.clearStageTable != null) { + validation.addMessage(Status.UNACCEPTABLE, "table", + "Clear stage table cannot be specified without specifying name of " + + "the stage table."); + } return validation; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java index a311c06..14a7033 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java @@ -29,4 +29,6 @@ public class ExportTableForm { @Input(size = 2000) public String tableName; @Input(size = 50) public String sql; @Input(size = 50) public String columns; + @Input(size = 2000) public String stageTableName; + @Input public Boolean clearStageTable; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties index 0950e32..fc805df 100644 --- a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties +++ b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties @@ -73,6 +73,14 @@ table.columns.help = Specific columns of a table name or a table SQL table.warehouse.label = Data warehouse table.warehouse.help = The root directory for data +# Stage table name +table.stageTableName.label = Stage table name +table.stageTableName.help = Name of the stage table to use + +# Clear stage table +table.clearStageTable.label = Clear stage table +table.clearStageTable.help = Indicate if the stage table should be cleared + # Table datadir table.dataDirectory.label = Data directory table.dataDirectory.help = The sub-directory under warehouse for data http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java new file mode 100644 index 0000000..e10a5b4 --- /dev/null +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import junit.framework.TestCase; + +public class GenericJdbcExecutorTest extends TestCase { + private final String table; + private final String emptyTable; + private final GenericJdbcExecutor executor; + + private static final int START = -50; + private static final int NUMBER_OF_ROWS = 974; + + public GenericJdbcExecutorTest() { + table = getClass().getSimpleName().toUpperCase(); + emptyTable = table + "_EMPTY"; + executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, + GenericJdbcTestConstants.URL, null, null); + } + + @Override + public void setUp() { + if(executor.existTable(emptyTable)) { + executor.executeUpdate("DROP TABLE " + emptyTable); + } + executor.executeUpdate("CREATE TABLE " + + emptyTable + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))"); + + if(executor.existTable(table)) { + executor.executeUpdate("DROP TABLE " + table); + } + executor.executeUpdate("CREATE TABLE " + + table + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))"); + + for (int i = 0; i < NUMBER_OF_ROWS; i++) { + int value = START + i; + String sql = "INSERT INTO " + table + + " VALUES(" + value + ", '" + value + "')"; + executor.executeUpdate(sql); + } + } + + @SuppressWarnings("unchecked") + public void testDeleteTableData() throws Exception { + executor.deleteTableData(table); + assertEquals("Table " + table + " is expected to be empty.", + 0, executor.getTableRowCount(table)); + } + + @SuppressWarnings("unchecked") + public void testMigrateData() throws Exception { + assertEquals("Table " + emptyTable + " is expected to be empty.", + 0, executor.getTableRowCount(emptyTable)); + assertEquals("Table " + table + " is expected to have " + + NUMBER_OF_ROWS + " rows.", NUMBER_OF_ROWS, + executor.getTableRowCount(table)); + + executor.migrateData(table, emptyTable); + + assertEquals("Table " + table + " is expected to be empty.", 0, + executor.getTableRowCount(table)); + assertEquals("Table " + emptyTable + " is expected to have " + + NUMBER_OF_ROWS + " rows.", NUMBER_OF_ROWS, + executor.getTableRowCount(emptyTable)); + } + + @SuppressWarnings("unchecked") + public void testGetTableRowCount() throws Exception { + assertEquals("Table " + table + " is expected to be empty.", + NUMBER_OF_ROWS, executor.getTableRowCount(table)); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java index f83aaa2..d55b0f1 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java @@ -18,19 +18,23 @@ package org.apache.sqoop.connector.jdbc; import junit.framework.TestCase; - import org.apache.sqoop.common.MutableContext; import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.validation.Status; +import org.apache.sqoop.validation.Validation; public class TestExportInitializer extends TestCase { private final String schemaName; private final String tableName; private final String schemalessTableName; + private final String stageTableName; private final String tableSql; private final String schemalessTableSql; private final String tableColumns; @@ -41,6 +45,8 @@ public class TestExportInitializer extends TestCase { schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA"; tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA"; schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE"; + stageTableName = getClass().getSimpleName().toUpperCase() + + "_STAGE_TABLE"; tableSql = "INSERT INTO " + tableName + " VALUES (?,?,?)"; schemalessTableSql = "INSERT INTO " + schemalessTableName + " VALUES (?,?,?)"; tableColumns = "ICOL,VCOL"; @@ -199,4 +205,161 @@ public class TestExportInitializer extends TestCase { assertEquals(dataSql, context.getString( GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL)); } + + private void createTable(String tableName) { + try { + executor.executeUpdate("DROP TABLE " + tableName); + } catch(SqoopException e) { + //Ok to fail as the table might not exist + } + executor.executeUpdate("CREATE TABLE " + tableName + + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + } + + public void testNonExistingStageTable() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ExportJobConfiguration jobConf = new ExportJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.table.tableName = schemalessTableName; + jobConf.table.stageTableName = stageTableName; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcExportInitializer(); + try { + initializer.initialize(initializerContext, connConf, jobConf); + fail("Initialization should fail for non-existing stage table."); + } catch(SqoopException se) { + //expected + } + } + + @SuppressWarnings("unchecked") + public void testNonEmptyStageTable() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ExportJobConfiguration jobConf = new ExportJobConfiguration(); + + String fullStageTableName = executor.delimitIdentifier(stageTableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.table.tableName = schemalessTableName; + jobConf.table.stageTableName = stageTableName; + createTable(fullStageTableName); + executor.executeUpdate("INSERT INTO " + fullStageTableName + + " VALUES(1, 1.1, 'one')"); + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcExportInitializer(); + try { + initializer.initialize(initializerContext, connConf, jobConf); + fail("Initialization should fail for non-empty stage table."); + } catch(SqoopException se) { + //expected + } + } + + @SuppressWarnings("unchecked") + public void testClearStageTableValidation() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ExportJobConfiguration jobConf = new ExportJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + //specifying clear stage table flag without specifying name of + // the stage table + jobConf.table.tableName = schemalessTableName; + jobConf.table.clearStageTable = false; + GenericJdbcValidator validator = new GenericJdbcValidator(); + Validation validation = validator.validateJob(MJob.Type.EXPORT, jobConf); + assertEquals("User should not specify clear stage table flag without " + + "specifying name of the stage table", + Status.UNACCEPTABLE, + validation.getStatus()); + assertTrue(validation.getMessages().containsKey( + new Validation.FormInput("table"))); + + jobConf.table.clearStageTable = true; + validation = validator.validateJob(MJob.Type.EXPORT, jobConf); + assertEquals("User should not specify clear stage table flag without " + + "specifying name of the stage table", + Status.UNACCEPTABLE, + validation.getStatus()); + assertTrue(validation.getMessages().containsKey( + new Validation.FormInput("table"))); + } + + @SuppressWarnings("unchecked") + public void testStageTableWithoutTable() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ExportJobConfiguration jobConf = new ExportJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + //specifying stage table without specifying table name + jobConf.table.stageTableName = stageTableName; + jobConf.table.sql = ""; + + GenericJdbcValidator validator = new GenericJdbcValidator(); + Validation validation = validator.validateJob(MJob.Type.EXPORT, jobConf); + assertEquals("Stage table name cannot be specified without specifying " + + "table name", Status.UNACCEPTABLE, validation.getStatus()); + assertTrue(validation.getMessages().containsKey( + new Validation.FormInput("table"))); + } + + @SuppressWarnings("unchecked") + public void testClearStageTable() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ExportJobConfiguration jobConf = new ExportJobConfiguration(); + + String fullStageTableName = executor.delimitIdentifier(stageTableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.table.tableName = schemalessTableName; + jobConf.table.stageTableName = stageTableName; + jobConf.table.clearStageTable = true; + createTable(fullStageTableName); + executor.executeUpdate("INSERT INTO " + fullStageTableName + + " VALUES(1, 1.1, 'one')"); + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcExportInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + assertEquals("Stage table should have been cleared", 0, + executor.getTableRowCount(stageTableName)); + } + + @SuppressWarnings("unchecked") + public void testStageTable() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ExportJobConfiguration jobConf = new ExportJobConfiguration(); + + String fullStageTableName = executor.delimitIdentifier(stageTableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.table.tableName = schemalessTableName; + jobConf.table.stageTableName = stageTableName; + createTable(fullStageTableName); + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcExportInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, "INSERT INTO " + fullStageTableName + + " VALUES (?,?,?)"); + } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/e33d6934/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/exports/TableStagedExportTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/exports/TableStagedExportTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/exports/TableStagedExportTest.java new file mode 100644 index 0000000..e36437b --- /dev/null +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/exports/TableStagedExportTest.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.integration.connector.jdbc.generic.exports; + +import org.apache.sqoop.model.MConnection; +import org.apache.sqoop.model.MFormList; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.test.data.Cities; +import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * + */ +public class TableStagedExportTest extends ConnectorTestCase { + + @Test + public void testStagedExport() throws Exception { + final String stageTableName = "STAGE_" + getTableName(); + createTableCities(); + createInputMapreduceFile("input-0001", + "1,'USA','San Francisco'", + "2,'USA','Sunnyvale'", + "3,'Czech Republic','Brno'", + "4,'USA','Palo Alto'" + ); + new Cities(provider, stageTableName).createTables(); + // Connection creation + MConnection connection = getClient().newConnection("generic-jdbc-connector"); + fillConnectionForm(connection); + createConnection(connection); + + // Job creation + MJob job = getClient().newJob(connection.getPersistenceId(), + MJob.Type.EXPORT); + + // Connector values + MFormList forms = job.getConnectorPart(); + forms.getStringInput("table.tableName").setValue( + provider.escapeTableName(getTableName())); + forms.getStringInput("table.stageTableName").setValue( + provider.escapeTableName(stageTableName)); + fillInputForm(job); + createJob(job); + + runJob(job); + + assertEquals(0L, provider.rowCount(stageTableName)); + assertEquals(4L, rowCount()); + assertRowInCities(1, "USA", "San Francisco"); + assertRowInCities(2, "USA", "Sunnyvale"); + assertRowInCities(3, "Czech Republic", "Brno"); + assertRowInCities(4, "USA", "Palo Alto"); + + // Clean up testing table + provider.dropTable(stageTableName); + dropTable(); + } + +}
