Repository: sqoop Updated Branches: refs/heads/trunk b2643d509 -> 57278b9e3
SQOOP-3333: Change default behavior of the MS SQL connector to non-resilient. (Fero Szabo via Szabolcs Vasas) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/57278b9e Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/57278b9e Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/57278b9e Branch: refs/heads/trunk Commit: 57278b9e3370dc65cb465e9ed3cb225203dc7eab Parents: b2643d5 Author: Szabolcs Vasas <[email protected]> Authored: Wed Jun 20 12:41:39 2018 +0200 Committer: Szabolcs Vasas <[email protected]> Committed: Wed Jun 20 12:41:39 2018 +0200 ---------------------------------------------------------------------- src/docs/user/connectors.txt | 16 +- .../apache/sqoop/manager/ExportJobContext.java | 8 + .../apache/sqoop/manager/SQLServerManager.java | 163 +++------------- .../SqlServerManagerContextConfigurator.java | 130 +++++++++++++ .../sqlserver/SQLServerManagerImportTest.java | 189 ++++++++++--------- ...TestSqlServerManagerContextConfigurator.java | 121 ++++++++++++ 6 files changed, 392 insertions(+), 235 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/57278b9e/src/docs/user/connectors.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/connectors.txt b/src/docs/user/connectors.txt index 7c54071..f1c7aeb 100644 --- a/src/docs/user/connectors.txt +++ b/src/docs/user/connectors.txt @@ -127,7 +127,7 @@ Argument Description --------------------------------------------------------------------------------- +\--identity-insert Set IDENTITY_INSERT to ON before \ export insert. -+\--non-resilient+ Don't attempt to recover failed \ ++\--resilient+ Attempt to recover failed \ export operations. +\--schema <name>+ Scheme name that sqoop should use. \ Default is "dbo". @@ -144,14 +144,18 @@ You can allow inserts on columns that have identity. For example: $ sqoop export ... --export-dir custom_dir --table custom_table -- --identity-insert ---- -Non-resilient operations -^^^^^^^^^^^^^^^^^^^^^^^^ +Resilient operations +^^^^^^^^^^^^^^^^^^^^ -You can override the default and not use resilient operations during export. -This will avoid retrying failed operations. For example: +You can override the default and use resilient operations during export. +This will retry failed operations, i.e. if the connection gets dropped by +SQL Server, the mapper will try to reconnect and continue from where it was before. +The split-by column has to be specified and it is also required to be unique +and in ascending order. +For example: ---- -$ sqoop export ... --export-dir custom_dir --table custom_table -- --non-resilient +$ sqoop export ... --export-dir custom_dir --table custom_table -- --resilient ---- Schema support http://git-wip-us.apache.org/repos/asf/sqoop/blob/57278b9e/src/java/org/apache/sqoop/manager/ExportJobContext.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/ExportJobContext.java b/src/java/org/apache/sqoop/manager/ExportJobContext.java index 773cf74..643f4b1 100644 --- a/src/java/org/apache/sqoop/manager/ExportJobContext.java +++ b/src/java/org/apache/sqoop/manager/ExportJobContext.java @@ -33,6 +33,7 @@ public class ExportJobContext { private String jarFile; private SqoopOptions options; private ConnManager manager; + private Class outputFormatClass; public ExportJobContext(final String table, final String jar, final SqoopOptions opts) { @@ -78,5 +79,12 @@ public class ExportJobContext { return this.manager; } + public Class getOutputFormatClass() { + return outputFormatClass; + } + + public void setOutputFormatClass(Class outputFormatClass) { + this.outputFormatClass = outputFormatClass; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/57278b9e/src/java/org/apache/sqoop/manager/SQLServerManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/SQLServerManager.java b/src/java/org/apache/sqoop/manager/SQLServerManager.java index b136087..c98ad2d 100644 --- a/src/java/org/apache/sqoop/manager/SQLServerManager.java +++ b/src/java/org/apache/sqoop/manager/SQLServerManager.java @@ -31,28 +31,22 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.sqoop.mapreduce.JdbcUpsertExportJob; -import org.apache.sqoop.mapreduce.SQLServerResilientExportOutputFormat; -import org.apache.sqoop.mapreduce.SQLServerResilientUpdateOutputFormat; -import org.apache.sqoop.mapreduce.db.SQLServerDBInputFormat; -import org.apache.sqoop.mapreduce.db.SQLServerConnectionFailureHandler; import org.apache.sqoop.SqoopOptions; import org.apache.sqoop.mapreduce.JdbcExportJob; import org.apache.sqoop.mapreduce.JdbcUpdateExportJob; +import org.apache.sqoop.mapreduce.sqlserver.SqlServerInputFormat; import org.apache.sqoop.util.ExportException; import org.apache.sqoop.util.ImportException; import org.apache.sqoop.cli.RelatedOptions; -import org.apache.sqoop.mapreduce.sqlserver.SqlServerExportBatchOutputFormat; -import org.apache.sqoop.mapreduce.sqlserver.SqlServerInputFormat; import org.apache.sqoop.mapreduce.sqlserver.SqlServerUpsertOutputFormat; /** * Manages connections to SQLServer databases. Requires the SQLServer JDBC * driver. */ -public class SQLServerManager - extends InformationSchemaManager { +public class SQLServerManager extends InformationSchemaManager { public static final String SCHEMA = "schema"; public static final String TABLE_HINTS = "table-hints"; @@ -62,8 +56,6 @@ public class SQLServerManager public static final Log LOG = LogFactory.getLog( SQLServerManager.class.getName()); - // Option set in extra-arguments to disable resiliency and use default mode - public static final String NON_RESILIENT_OPTION = "non-resilient"; // Option to allow inserts on identity columns public static final String IDENTITY_INSERT = "identity-insert"; @@ -88,12 +80,19 @@ public class SQLServerManager */ private boolean identityInserts; + final SqlServerManagerContextConfigurator formatConfigurator; + public SQLServerManager(final SqoopOptions opts) { this(SQLSERVER.getDriverClass(), opts); } public SQLServerManager(final String driver, final SqoopOptions opts) { + this(driver, opts, new SqlServerManagerContextConfigurator()); + } + + public SQLServerManager(final String driver, final SqoopOptions opts, final SqlServerManagerContextConfigurator configurator) { super(driver, opts); + this.formatConfigurator = configurator; // Try to parse extra arguments try { @@ -121,7 +120,7 @@ public class SQLServerManager * import/export. */ javaType = "String"; - }else { + } else { //If none of the above data types match, it returns parent method's //status, which can be null. javaType = super.toJavaType(sqlType); @@ -133,8 +132,7 @@ public class SQLServerManager * {@inheritDoc} */ @Override - public void importTable( - org.apache.sqoop.manager.ImportJobContext context) + public void importTable(org.apache.sqoop.manager.ImportJobContext context) throws IOException, ImportException { // We're the correct connection manager context.setConnManager(this); @@ -144,20 +142,9 @@ public class SQLServerManager if (tableHints != null) { configuration.set(TABLE_HINTS_PROP, tableHints); } - if (!isNonResilientOperation()) { - // Enable connection recovery only if split column is provided - SqoopOptions opts = context.getOptions(); - String splitCol = getSplitColumn(opts, context.getTableName()); - if (splitCol != null) { - // Configure SQLServer table import jobs for connection recovery - configureConnectionRecoveryForImport(context); - } else { - // Set our own input format - context.setInputFormat(SqlServerInputFormat.class); - } - } else { - context.setInputFormat(SqlServerInputFormat.class); - } + String splitColumn = getSplitColumn(context.getOptions(), context.getTableName()); + context.setInputFormat(SqlServerInputFormat.class); + formatConfigurator.configureContextForImport(context, splitColumn); super.importTable(context); } @@ -165,8 +152,7 @@ public class SQLServerManager * Export data stored in HDFS into a table in a database. */ @Override - public void exportTable(org.apache.sqoop.manager.ExportJobContext context) - throws IOException, ExportException { + public void exportTable(org.apache.sqoop.manager.ExportJobContext context) throws IOException, ExportException { context.setConnManager(this); // Propagate table hints to job @@ -178,15 +164,9 @@ public class SQLServerManager // Propagate whether to allow identity inserts to job configuration.setBoolean(IDENTITY_INSERT_PROP, identityInserts); - JdbcExportJob exportJob; - if (isNonResilientOperation()) { - exportJob = new JdbcExportJob(context, null, null, - SqlServerExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator()); - } else { - exportJob = new JdbcExportJob(context, null, null, - SQLServerResilientExportOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator()); - configureConnectionRecoveryForExport(context); - } + formatConfigurator.configureContextForExport(context); + JdbcExportJob exportJob = new JdbcExportJob(context, null, null, + context.getOutputFormatClass(), getParquetJobConfigurator().createParquetExportJobConfigurator()); exportJob.runExport(); } @@ -194,17 +174,15 @@ public class SQLServerManager /** * {@inheritDoc} */ - public void updateTable( - org.apache.sqoop.manager.ExportJobContext context) + public void updateTable(org.apache.sqoop.manager.ExportJobContext context) throws IOException, ExportException { - if (isNonResilientOperation()) { - super.updateTable(context); - } else { - context.setConnManager(this); + boolean runAsExportJob = formatConfigurator.configureContextForUpdate(context, this); + if (runAsExportJob) { JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context, null, - null, SQLServerResilientUpdateOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator()); - configureConnectionRecoveryForUpdate(context); + null, context.getOutputFormatClass(), getParquetJobConfigurator().createParquetExportJobConfigurator()); exportJob.runExport(); + } else { + super.updateTable(context); } } @@ -391,99 +369,10 @@ public class SQLServerManager */ public void importQuery(org.apache.sqoop.manager.ImportJobContext context) throws IOException, ImportException { - if (!isNonResilientOperation()) { - // Enable connection recovery only if split column is provided - SqoopOptions opts = context.getOptions(); - String splitCol = getSplitColumn(opts, context.getTableName()); - if (splitCol != null) { - // Configure SQLServer query import jobs for connection recovery - configureConnectionRecoveryForImport(context); - } - } + String splitColumn = getSplitColumn(context.getOptions(), context.getTableName()); + formatConfigurator.configureContextForImport(context, splitColumn); super.importQuery(context); } - /** - * Configure SQLServer Sqoop Jobs to recover failed connections by using - * SQLServerConnectionFailureHandler by default. - */ - protected void configureConnectionRecoveryForImport( - org.apache.sqoop.manager.ImportJobContext context) { - - Configuration conf = context.getOptions().getConf(); - - // Configure input format class - context.setInputFormat(SQLServerDBInputFormat.class); - - // Set connection failure handler and recovery settings - // Default settings can be overridden if provided as Configuration - // properties by the user - if (conf.get(SQLServerDBInputFormat.IMPORT_FAILURE_HANDLER_CLASS) - == null) { - conf.set(SQLServerDBInputFormat.IMPORT_FAILURE_HANDLER_CLASS, - SQLServerConnectionFailureHandler.class.getName()); - } - } - - /** - * Configure SQLServer Sqoop export Jobs to recover failed connections by - * using SQLServerConnectionFailureHandler by default. - */ - protected void configureConnectionRecoveryForExport( - org.apache.sqoop.manager.ExportJobContext context) { - - Configuration conf = context.getOptions().getConf(); - - // Set connection failure handler and recovery settings - // Default settings can be overridden if provided as Configuration - // properties by the user - String clsFailureHandler = conf.get( - SQLServerResilientExportOutputFormat.EXPORT_FAILURE_HANDLER_CLASS); - if (clsFailureHandler == null) { - conf.set( - SQLServerResilientExportOutputFormat.EXPORT_FAILURE_HANDLER_CLASS, - SQLServerConnectionFailureHandler.class.getName()); - } - } - - /** - * Configure SQLServer Sqoop Update Jobs to recover connection failures by - * using SQLServerConnectionFailureHandler by default. - */ - protected void configureConnectionRecoveryForUpdate( - org.apache.sqoop.manager.ExportJobContext context) { - - Configuration conf = context.getOptions().getConf(); - - // Set connection failure handler and recovery settings - // Default settings can be overridden if provided as Configuration - // properties by the user - String clsFailureHandler = conf.get( - SQLServerResilientExportOutputFormat.EXPORT_FAILURE_HANDLER_CLASS); - if (clsFailureHandler == null) { - conf.set( - SQLServerResilientExportOutputFormat.EXPORT_FAILURE_HANDLER_CLASS, - SQLServerConnectionFailureHandler.class.getName()); - } - } - - /** - * Check if the user has requested the operation to be non resilient. - */ - protected boolean isNonResilientOperation() { - String [] extraArgs = options.getExtraArgs(); - if (extraArgs != null) { - // Traverse the extra options - for (int iArg = 0; iArg < extraArgs.length; ++iArg) { - String currentArg = extraArgs[iArg]; - if (currentArg.startsWith("--") - && currentArg.substring(2).equalsIgnoreCase(NON_RESILIENT_OPTION)) { - // User has explicitly requested the operation to be non-resilient - return true; - } - } - } - return false; - } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/57278b9e/src/java/org/apache/sqoop/manager/SqlServerManagerContextConfigurator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/SqlServerManagerContextConfigurator.java b/src/java/org/apache/sqoop/manager/SqlServerManagerContextConfigurator.java new file mode 100644 index 0000000..cf58f63 --- /dev/null +++ b/src/java/org/apache/sqoop/manager/SqlServerManagerContextConfigurator.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.manager; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.mapreduce.SQLServerResilientExportOutputFormat; +import org.apache.sqoop.mapreduce.SQLServerResilientUpdateOutputFormat; +import org.apache.sqoop.mapreduce.db.SQLServerConnectionFailureHandler; +import org.apache.sqoop.mapreduce.db.SQLServerDBInputFormat; +import org.apache.sqoop.mapreduce.sqlserver.SqlServerExportBatchOutputFormat; + +public class SqlServerManagerContextConfigurator { + + private static final String RESILIENT_OPTION = "resilient"; + + /** + * Check if the user has requested the operation to be resilient. + */ + private boolean isResilientOperation(SqoopOptions options) { + String [] extraArgs = options.getExtraArgs(); + if (extraArgs != null) { + // Traverse the extra options + for (int iArg = 0; iArg < extraArgs.length; ++iArg) { + String currentArg = extraArgs[iArg]; + if (currentArg.startsWith("--") + && currentArg.substring(2).equalsIgnoreCase(RESILIENT_OPTION)) { + // User has explicitly requested the operation to be resilient + return true; + } + } + } + return false; + } + + public void configureContextForExport(ExportJobContext context) { + if (isResilientOperation(context.getOptions())) { + context.setOutputFormatClass(SQLServerResilientExportOutputFormat.class); + configureConnectionRecoveryForExport(context); + } else { + context.setOutputFormatClass(SqlServerExportBatchOutputFormat.class); + } + } + + /** + * Configure SQLServer Sqoop export Jobs to recover failed connections by + * using {@link SQLServerConnectionFailureHandler}. This can be overridden by setting the + * {@link SQLServerResilientExportOutputFormat#EXPORT_FAILURE_HANDLER_CLASS} in the configuration. + */ + private void configureConnectionRecoveryForExport( + ExportJobContext context) { + + Configuration conf = context.getOptions().getConf(); + + // Set connection failure handler and recovery settings + // Can be overridden if provided as Configuration + // properties by the user + String clsFailureHandler = conf.get( + SQLServerResilientExportOutputFormat.EXPORT_FAILURE_HANDLER_CLASS); + if (clsFailureHandler == null) { + conf.set( + SQLServerResilientExportOutputFormat.EXPORT_FAILURE_HANDLER_CLASS, + SQLServerConnectionFailureHandler.class.getName()); + } + } + + /** + * Configure SQLServer Sqoop Jobs to recover failed connections by using + * {@link SQLServerConnectionFailureHandler}. This can be overridden by setting the + * {@link SQLServerDBInputFormat#IMPORT_FAILURE_HANDLER_CLASS} in the configuration. + */ + private void configureConnectionRecoveryForImport( + ImportJobContext context) { + + Configuration conf = context.getOptions().getConf(); + + // Configure input format class + context.setInputFormat(SQLServerDBInputFormat.class); + + // Set connection failure handler and recovery settings + // Can be overridden if provided as Configuration + // properties by the user + if (conf.get(SQLServerDBInputFormat.IMPORT_FAILURE_HANDLER_CLASS) == null) { + conf.set(SQLServerDBInputFormat.IMPORT_FAILURE_HANDLER_CLASS, + SQLServerConnectionFailureHandler.class.getName()); + } + } + + public void configureContextForImport(ImportJobContext context, String splitCol) { + if (isResilientOperation(context.getOptions())) { + // Enable connection recovery only if split column is provided + if (splitCol != null) { + // Configure SQLServer table import jobs for connection recovery + configureConnectionRecoveryForImport(context); + } + } + } + + /** + * + * @param context + * @param manager + * @return whether the job should be executed as an exportjob + */ + public boolean configureContextForUpdate(ExportJobContext context, SQLServerManager manager) { + boolean runAsExportJob = isResilientOperation(context.getOptions()); + if (runAsExportJob) { + context.setConnManager(manager); + context.setOutputFormatClass(SQLServerResilientUpdateOutputFormat.class); + configureConnectionRecoveryForExport(context); + } + return runAsExportJob; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/57278b9e/src/test/org/apache/sqoop/manager/sqlserver/SQLServerManagerImportTest.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerManagerImportTest.java b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerManagerImportTest.java index c83c2c9..fc1c489 100644 --- a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerManagerImportTest.java +++ b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerManagerImportTest.java @@ -18,36 +18,39 @@ package org.apache.sqoop.manager.sqlserver; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.sqoop.ConnFactory; +import org.apache.sqoop.SqoopOptions; import org.apache.sqoop.manager.SQLServerManager; +import org.apache.sqoop.testutil.ArgumentArrayBuilder; +import org.apache.sqoop.testutil.ImportJobTestCase; +import org.apache.sqoop.util.FileListing; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; -import org.apache.sqoop.SqoopOptions; -import org.apache.sqoop.testutil.CommonArgs; -import org.apache.sqoop.testutil.ImportJobTestCase; -import org.apache.sqoop.util.FileListing; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +@RunWith(Parameterized.class) /** * Test the SQLServerManager implementation. * @@ -92,8 +95,7 @@ public class SQLServerManagerImportTest extends ImportJobTestCase { static final String DBO_TABLE_NAME = "EMPLOYEES_MSSQL"; static final String SCHEMA_SCH = "sch"; static final String SCH_TABLE_NAME = "PRIVATE_TABLE"; - static final String CONNECT_STRING = HOST_URL - + ";databaseName=" + DATABASE_NAME; + static final String CONNECT_STRING = HOST_URL + ";databaseName=" + DATABASE_NAME; static final String CONNECTOR_FACTORY = System.getProperty( "sqoop.test.msserver.connector.factory", @@ -103,8 +105,34 @@ public class SQLServerManagerImportTest extends ImportJobTestCase { private SQLServerManager manager; private Configuration conf = new Configuration(); + private Connection conn = null; + public static final String[] EXPECTED_RESULTS = new String[]{ + "1,Aaron,1000000.0,engineering", + "2,Bob,400.0,sales", + "3,Fred,15.0,marketing", + }; + + @Parameters(name = "Builder: {0}, Table: {1}") + public static Iterable<? extends Object> testConfigurations() { + ArgumentArrayBuilder builderForTableImportWithExplicitSchema = getArgsBuilderForTableImport().withToolOption("schema", SCHEMA_DBO); + return Arrays.asList( + new Object[] { getArgsBuilderForQueryImport(), DBO_TABLE_NAME }, + new Object[] { getArgsBuilderForTableImport(), DBO_TABLE_NAME }, + new Object[] { getArgsBuilderForDifferentSchemaTableImport(), SCH_TABLE_NAME }, + new Object[] { builderForTableImportWithExplicitSchema, DBO_TABLE_NAME } + ); + } + + private final ArgumentArrayBuilder builder; + private final String tableName; + + public SQLServerManagerImportTest(ArgumentArrayBuilder builder, String tableName) { + this.builder = builder; + this.tableName = tableName; + } + @Override protected Configuration getConf() { return conf; @@ -124,8 +152,7 @@ public class SQLServerManagerImportTest extends ImportJobTestCase { public void setUp() { super.setUp(); - SqoopOptions options = new SqoopOptions(CONNECT_STRING, - DBO_TABLE_NAME); + SqoopOptions options = new SqoopOptions(CONNECT_STRING, DBO_TABLE_NAME); options.setUsername(DATABASE_USER); options.setPassword(DATABASE_PASSWORD); @@ -241,96 +268,74 @@ public class SQLServerManagerImportTest extends ImportJobTestCase { @Test public void testImportSimple() throws IOException { - String [] expectedResults = { - "1,Aaron,1000000.0,engineering", - "2,Bob,400.0,sales", - "3,Fred,15.0,marketing", - }; - - doImportAndVerify(DBO_TABLE_NAME, expectedResults); + doImportAndVerify(builder, tableName); } @Test - public void testImportExplicitDefaultSchema() throws IOException { - String [] expectedResults = { - "1,Aaron,1000000.0,engineering", - "2,Bob,400.0,sales", - "3,Fred,15.0,marketing", - }; - - String[] extraArgs = new String[] {"--schema", SCHEMA_DBO}; - - doImportAndVerify(DBO_TABLE_NAME, expectedResults, extraArgs); + public void testImportTableHints() throws IOException { + builder.withToolOption("table-hints", "NOLOCK"); + doImportAndVerify(builder, tableName); } @Test - public void testImportDifferentSchema() throws IOException { - String [] expectedResults = { - "1,Aaron,1000000.0,engineering", - "2,Bob,400.0,sales", - "3,Fred,15.0,marketing", - }; - - String[] extraArgs = new String[] {"--schema", SCHEMA_SCH}; - - doImportAndVerify(SCH_TABLE_NAME, expectedResults, extraArgs); + public void testImportTableHintsMultiple() throws IOException { + builder.withToolOption("table-hints", "NOLOCK,NOWAIT"); + doImportAndVerify(builder, tableName); } @Test - public void testImportTableHints() throws IOException { - String [] expectedResults = { - "1,Aaron,1000000.0,engineering", - "2,Bob,400.0,sales", - "3,Fred,15.0,marketing", - }; - - String[] extraArgs = new String[] {"--table-hints", "NOLOCK"}; - doImportAndVerify(DBO_TABLE_NAME, expectedResults, extraArgs); + public void testImportTableResilient() throws IOException { + builder.withToolOption("resilient"); + doImportAndVerify(builder, tableName); } + /** + * The resilient option was named non-resilient before, but got renamed. + * This test is here to ensure backward compatibility in the sense that + * using the non-resilient option won't break any job. + * + * @throws IOException + */ @Test - public void testImportTableHintsMultiple() throws IOException { - String [] expectedResults = { - "1,Aaron,1000000.0,engineering", - "2,Bob,400.0,sales", - "3,Fred,15.0,marketing", - }; + public void testImportTableNonResilient() throws IOException { + builder.withToolOption("non-resilient"); + doImportAndVerify(builder, tableName); + } - String[] extraArgs = new String[] {"--table-hints", "NOLOCK,NOWAIT"}; - doImportAndVerify(DBO_TABLE_NAME, expectedResults, extraArgs); + private static ArgumentArrayBuilder getArgsBuilder() { + ArgumentArrayBuilder builder = new ArgumentArrayBuilder(); + return builder.withCommonHadoopFlags(true) + .withOption("connect", CONNECT_STRING) + .withOption("username", DATABASE_USER) + .withOption("password", DATABASE_PASSWORD) + .withOption("num-mappers", "1") + .withOption("split-by", "id"); } - private String [] getArgv(String tableName, String ... extraArgs) { - ArrayList<String> args = new ArrayList<String>(); - - CommonArgs.addHadoopFlags(args); - - args.add("--table"); - args.add(tableName); - args.add("--warehouse-dir"); - args.add(getWarehouseDir()); - args.add("--connect"); - args.add(CONNECT_STRING); - args.add("--username"); - args.add(DATABASE_USER); - args.add("--password"); - args.add(DATABASE_PASSWORD); - args.add("--num-mappers"); - args.add("1"); - - if (extraArgs.length > 0) { - args.add("--"); - for (String arg : extraArgs) { - args.add(arg); - } - } + private static ArgumentArrayBuilder getArgsBuilderForTableImport() { + ArgumentArrayBuilder builder = getArgsBuilder(); + return builder.withCommonHadoopFlags(true) + .withOption("warehouse-dir", LOCAL_WAREHOUSE_DIR) + .withOption("table", DBO_TABLE_NAME); + } + + private static ArgumentArrayBuilder getArgsBuilderForQueryImport() { + ArgumentArrayBuilder builder = getArgsBuilder(); + return builder.withCommonHadoopFlags(true) + .withOption("query", "SELECT * FROM EMPLOYEES_MSSQL WHERE $CONDITIONS") + .withOption("target-dir", LOCAL_WAREHOUSE_DIR + "/" + DBO_TABLE_NAME); + } - return args.toArray(new String[0]); + private static ArgumentArrayBuilder getArgsBuilderForDifferentSchemaTableImport() { + ArgumentArrayBuilder builder = getArgsBuilder(); + return builder.withCommonHadoopFlags(true) + .withOption("warehouse-dir", LOCAL_WAREHOUSE_DIR) + .withOption("table", SCH_TABLE_NAME) + .withToolOption("schema", SCHEMA_SCH); } - private void doImportAndVerify(String tableName, - String [] expectedResults, - String ... extraArgs) throws IOException { + private void doImportAndVerify(ArgumentArrayBuilder argBuilder, + String tableName) throws IOException { Path warehousePath = new Path(this.getWarehouseDir()); Path tablePath = new Path(warehousePath, tableName); @@ -342,7 +347,7 @@ public class SQLServerManagerImportTest extends ImportJobTestCase { FileListing.recursiveDeleteDir(tableFile); } - String [] argv = getArgv(tableName, extraArgs); + String [] argv = argBuilder.build(); try { runImport(argv); } catch (IOException ioe) { @@ -357,7 +362,7 @@ public class SQLServerManagerImportTest extends ImportJobTestCase { try { // Read through the file and make sure it's all there. r = new BufferedReader(new InputStreamReader(new FileInputStream(f))); - for (String expectedLine : expectedResults) { + for (String expectedLine : EXPECTED_RESULTS) { assertEquals(expectedLine, r.readLine()); } } catch (IOException ioe) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/57278b9e/src/test/org/apache/sqoop/manager/sqlserver/TestSqlServerManagerContextConfigurator.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/sqlserver/TestSqlServerManagerContextConfigurator.java b/src/test/org/apache/sqoop/manager/sqlserver/TestSqlServerManagerContextConfigurator.java new file mode 100644 index 0000000..c0d0a24 --- /dev/null +++ b/src/test/org/apache/sqoop/manager/sqlserver/TestSqlServerManagerContextConfigurator.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.manager.sqlserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.manager.ExportJobContext; +import org.apache.sqoop.manager.ImportJobContext; +import org.apache.sqoop.manager.SqlServerManagerContextConfigurator; +import org.apache.sqoop.mapreduce.SQLServerResilientExportOutputFormat; +import org.apache.sqoop.mapreduce.SQLServerResilientUpdateOutputFormat; +import org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat; +import org.apache.sqoop.mapreduce.db.SQLServerDBInputFormat; +import org.apache.sqoop.mapreduce.sqlserver.SqlServerExportBatchOutputFormat; +import org.junit.Before; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test methods in the configuration utilities. + */ +public class TestSqlServerManagerContextConfigurator { + + public static final Log LOG = LogFactory.getLog(TestSqlServerManagerContextConfigurator.class.getName()); + + private final SqlServerManagerContextConfigurator formatConfigurator = new SqlServerManagerContextConfigurator(); + + private SqoopOptions options; + + @Test + public void testResilientImportContextConfiguration() { + String[] extraArgs = {"--resilient"}; + options.setExtraArgs(extraArgs); + + ImportJobContext context = new ImportJobContext("TABLE_NAME", "example.jar", options, null); + formatConfigurator.configureContextForImport(context, "id"); + Class<? extends InputFormat> inputFormat = context.getInputFormat(); + assertThat(inputFormat).isSameAs(SQLServerDBInputFormat.class); + } + + @Test + public void testNonResilientImportContextConfiguration() { + String[] extraArgs = {"--non-resilient"}; + options.setExtraArgs(extraArgs); + + ImportJobContext context = new ImportJobContext("TABLE_NAME", "example.jar", options, null); + formatConfigurator.configureContextForImport(context, "id"); + Class<? extends InputFormat> inputFormat = context.getInputFormat(); + assertThat(inputFormat).isSameAs(DataDrivenDBInputFormat.class); + } + + @Test + public void testResilientExportContextConfiguration() { + String[] extraArgs = {"--resilient"}; + options.setExtraArgs(extraArgs); + + ExportJobContext context = new ExportJobContext("TABLE_NAME", "example.jar", options); + formatConfigurator.configureContextForExport(context); + Class outputFormatClass = context.getOutputFormatClass(); + assertThat(outputFormatClass).isSameAs(SQLServerResilientExportOutputFormat.class); + } + + @Test + public void testNonResilientExportContextConfiguration() { + String[] extraArgs = {"--non-resilient"}; + options.setExtraArgs(extraArgs); + + ExportJobContext context = new ExportJobContext("TABLE_NAME", "example.jar", options); + formatConfigurator.configureContextForExport(context); + Class outputFormatClass = context.getOutputFormatClass(); + assertThat(outputFormatClass).isSameAs(SqlServerExportBatchOutputFormat.class); + } + + @Test + public void testResilientUpdateContextConfiguration() { + String[] extraArgs = {"--resilient"}; + options.setExtraArgs(extraArgs); + + ExportJobContext context = new ExportJobContext("TABLE_NAME", "example.jar", options); + formatConfigurator.configureContextForUpdate(context, null); + Class outputFormatClass = context.getOutputFormatClass(); + assertThat(outputFormatClass).isSameAs(SQLServerResilientUpdateOutputFormat.class); + } + + @Test + public void testNonResilientUpdateContextConfiguration() { + String[] extraArgs = {"--non-resilient"}; + options.setExtraArgs(extraArgs); + + ExportJobContext context = new ExportJobContext("TABLE_NAME", "example.jar", options); + formatConfigurator.configureContextForUpdate(context, null); + Class outputFormatClass = context.getOutputFormatClass(); + assertThat(outputFormatClass).isNull(); + } + + @Before + public void setUp() { + Configuration conf = new Configuration(); + this.options = new SqoopOptions(conf); + } +}
