Updated Branches: refs/heads/trunk 03fa9c530 -> d3758915b
SQOOP-1278: Allow use of uncommitted isolation for databases that support it as an import option (Venkat Ranganathan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/d3758915 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/d3758915 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/d3758915 Branch: refs/heads/trunk Commit: d3758915bb8b265ec157cd4bc21a7e17d922458e Parents: 03fa9c5 Author: Jarek Jarcec Cecho <[email protected]> Authored: Tue Feb 4 13:01:19 2014 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Tue Feb 4 13:01:19 2014 -0800 ---------------------------------------------------------------------- src/docs/user/common-args.txt | 2 + src/docs/user/import.txt | 12 ++++++ src/java/org/apache/sqoop/SqoopOptions.java | 16 ++++++++ .../sqoop/mapreduce/DataDrivenImportJob.java | 8 +++- .../sqoop/mapreduce/db/DBConfiguration.java | 9 +++++ .../sqoop/mapreduce/db/DBInputFormat.java | 39 +++++++++++++++++--- .../org/apache/sqoop/tool/BaseSqoopTool.java | 9 +++++ .../com/cloudera/sqoop/TestSqoopOptions.java | 7 ++++ 8 files changed, 95 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3758915/src/docs/user/common-args.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/common-args.txt b/src/docs/user/common-args.txt index 8a017f4..98f19be 100644 --- a/src/docs/user/common-args.txt +++ b/src/docs/user/common-args.txt @@ -37,4 +37,6 @@ Argument Description +\--verbose+ Print more information while working +\--connection-param-file <filename>+ Optional properties file that\ provides connection parameters ++\--relaxed-isolation+ Set connection transaction isolation\ + to read uncommitted for the mappers. ------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3758915/src/docs/user/import.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt index 0db6d97..7a3fa43 100644 --- a/src/docs/user/import.txt +++ b/src/docs/user/import.txt @@ -285,6 +285,18 @@ are expected to be present in the shell path of the task process. For MySQL the utilities +mysqldump+ and +mysqlimport+ are required, whereas for PostgreSQL the utility +psql+ is required. +Controlling transaction isolation +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +By default, Sqoop uses the read committed transaction isolation in the mappers +to import data. This may not be the ideal in all ETL workflows and it may +desired to reduce the isolation guarantees. The +\--relaxed-isolation+ option +can be used to instruct Sqoop to use read uncommitted isolation level. + +The +read-uncommitted+ isolation level is not supported on all databases +(for example, Oracle), so specifying the option +\--relaxed-isolation+ +may not be supported on all databases. + Controlling type mapping ^^^^^^^^^^^^^^^^^^^^^^^^ http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3758915/src/java/org/apache/sqoop/SqoopOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index 46e158c..f1b8b13 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -300,6 +300,9 @@ public class SqoopOptions implements Cloneable { // Accumulo zookeeper @StoredAsProperty("accumulo.zookeepers") private String accumuloZookeepers; + // Relaxed Isolation + @StoredAsProperty("relaxed.isolation") private boolean relaxedIsolation; + // These next two fields are not serialized to the metastore. // If this SqoopOptions is created by reading a saved job, these will // be populated by the JobStorage to facilitate updating the same @@ -962,6 +965,10 @@ public class SqoopOptions implements Cloneable { this.validatorClass = RowCountValidator.class; this.validationThresholdClass = AbsoluteValidationThreshold.class; this.validationFailureHandlerClass = AbortOnFailureHandler.class; + + // Relaxed isolation will not enabled by default which is the behavior + // of sqoop until now. + this.relaxedIsolation = false; } /** @@ -2449,4 +2456,13 @@ public class SqoopOptions implements Cloneable { public boolean isSkipDistCache() { return this.skipDistCache; } + + public void setRelaxedIsolation(boolean b) { + this.relaxedIsolation = true; + + } + + public boolean getRelaxedIsolation() { + return this.relaxedIsolation; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3758915/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java index b21560e..6dcfebb 100644 --- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java @@ -247,7 +247,13 @@ public class DataDrivenImportJob extends ImportJobBase { new DBConfiguration(job.getConfiguration()).setInputOrderBy( splitByCol); } - + if (options.getRelaxedIsolation()) { + LOG + .info("Enabling relaxed (read uncommitted) transaction " + + "isolation for imports"); + job.getConfiguration() + .setBoolean(DBConfiguration.PROP_RELAXED_ISOLATION, true); + } LOG.debug("Using table class: " + tableClassName); job.getConfiguration().set(ConfigurationHelper.getDbInputClassProperty(), tableClassName); http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3758915/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java b/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java index be942ce..a9b7e42 100644 --- a/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java +++ b/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java @@ -122,6 +122,13 @@ public class DBConfiguration { "mapreduce.jdbc.output.field.count"; /** + * The name of the parameter to use for making Isolation level to be + * read uncommitted by default for connections. + */ + public static final String PROP_RELAXED_ISOLATION = + "org.apache.sqoop.db.relaxedisolation"; + + /** * Sets the DB access related fields in the {@link Configuration}. * @param conf the configuration * @param driverClass JDBC Driver class name @@ -150,6 +157,7 @@ public class DBConfiguration { conf.set(CONNECTION_PARAMS_PROPERTY, propertiesToString(connectionParams)); } + } // set the password in the secure credentials object @@ -295,6 +303,7 @@ public class DBConfiguration { connectString, username, password); } } + return connection; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3758915/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java b/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java index 73ed94e..3a8e5d0 100644 --- a/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java +++ b/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java @@ -29,6 +29,8 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; @@ -58,7 +60,8 @@ import com.cloudera.sqoop.mapreduce.db.OracleDBRecordReader; public class DBInputFormat<T extends DBWritable> extends InputFormat<LongWritable, T> implements Configurable { - + public static final Log LOG = LogFactory.getLog( + DBInputFormat.class.getName()); private String dbProductName = "DEFAULT"; /** @@ -160,9 +163,6 @@ extends InputFormat<LongWritable, T> implements Configurable { try { getConnection(); - - DatabaseMetaData dbMeta = connection.getMetaData(); - this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase(); } catch (Exception ex) { throw new RuntimeException(ex); } @@ -172,6 +172,31 @@ extends InputFormat<LongWritable, T> implements Configurable { conditions = dbConf.getInputConditions(); } + private void setTxIsolation(Connection conn) { + try { + + if (getConf() + .getBoolean(DBConfiguration.PROP_RELAXED_ISOLATION, false)) { + if (dbProductName.startsWith("ORACLE")) { + LOG.info("Using read committed transaction isolation for Oracle" + + " as read uncommitted is not supported"); + this.connection.setTransactionIsolation( + Connection.TRANSACTION_READ_COMMITTED); + } else { + LOG.info("Using read uncommited transaction isolation"); + this.connection.setTransactionIsolation( + Connection.TRANSACTION_READ_UNCOMMITTED); + } + } + else { + LOG.info("Using read commited transaction isolation"); + this.connection.setTransactionIsolation( + Connection.TRANSACTION_READ_COMMITTED); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } public Configuration getConf() { return dbConf.getConf(); } @@ -182,12 +207,14 @@ extends InputFormat<LongWritable, T> implements Configurable { public Connection getConnection() { try { + if (null == this.connection) { // The connection was closed; reinstantiate it. this.connection = dbConf.getConnection(); this.connection.setAutoCommit(false); - this.connection.setTransactionIsolation( - Connection.TRANSACTION_READ_COMMITTED); + DatabaseMetaData dbMeta = connection.getMetaData(); + this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase(); + setTxIsolation(connection); } } catch (Exception e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3758915/src/java/org/apache/sqoop/tool/BaseSqoopTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java index 6d6f1ea..ceda9f3 100644 --- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java +++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java @@ -155,6 +155,7 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool { public static final String UPDATE_MODE_ARG = "update-mode"; public static final String CALL_ARG = "call"; public static final String SKIP_DISTCACHE_ARG = "skip-dist-cache"; + public static final String RELAXED_ISOLATION = "relaxed-isolation"; // Arguments for validation. public static final String VALIDATE_ARG = "validate"; @@ -444,6 +445,11 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool { .withDescription("Print usage instructions") .withLongOpt(HELP_ARG) .create()); + // relax isolation requirements + commonOpts.addOption(OptionBuilder + .withDescription("Use read-uncommitted isolation for imports") + .withLongOpt(RELAXED_ISOLATION) + .create()); return commonOpts; } @@ -969,6 +975,9 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool { } else if (in.hasOption(HADOOP_HOME_ARG)) { out.setHadoopMapRedHome(in.getOptionValue(HADOOP_HOME_ARG)); } + if (in.hasOption(RELAXED_ISOLATION)) { + out.setRelaxedIsolation(true); + } } private void applyCredentialsOptions(CommandLine in, SqoopOptions out) http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3758915/src/test/com/cloudera/sqoop/TestSqoopOptions.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/TestSqoopOptions.java b/src/test/com/cloudera/sqoop/TestSqoopOptions.java index 686d398..60460d9 100644 --- a/src/test/com/cloudera/sqoop/TestSqoopOptions.java +++ b/src/test/com/cloudera/sqoop/TestSqoopOptions.java @@ -468,4 +468,11 @@ public class TestSqoopOptions extends TestCase { private static String longArgument(String argument) { return String.format("--%s", argument); } + + public void testRelaxedIsolation() throws Exception { + String extraArgs[] = { + "--relaxed-isolation", + }; + validateImportOptions(extraArgs); + } }
