Updated Branches:
  refs/heads/trunk 03fa9c530 -> d3758915b

SQOOP-1278: Allow use of uncommitted isolation for databases that support it as 
an import option

(Venkat Ranganathan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/d3758915
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/d3758915
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/d3758915

Branch: refs/heads/trunk
Commit: d3758915bb8b265ec157cd4bc21a7e17d922458e
Parents: 03fa9c5
Author: Jarek Jarcec Cecho <[email protected]>
Authored: Tue Feb 4 13:01:19 2014 -0800
Committer: Jarek Jarcec Cecho <[email protected]>
Committed: Tue Feb 4 13:01:19 2014 -0800

----------------------------------------------------------------------
 src/docs/user/common-args.txt                   |  2 +
 src/docs/user/import.txt                        | 12 ++++++
 src/java/org/apache/sqoop/SqoopOptions.java     | 16 ++++++++
 .../sqoop/mapreduce/DataDrivenImportJob.java    |  8 +++-
 .../sqoop/mapreduce/db/DBConfiguration.java     |  9 +++++
 .../sqoop/mapreduce/db/DBInputFormat.java       | 39 +++++++++++++++++---
 .../org/apache/sqoop/tool/BaseSqoopTool.java    |  9 +++++
 .../com/cloudera/sqoop/TestSqoopOptions.java    |  7 ++++
 8 files changed, 95 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3758915/src/docs/user/common-args.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/common-args.txt b/src/docs/user/common-args.txt
index 8a017f4..98f19be 100644
--- a/src/docs/user/common-args.txt
+++ b/src/docs/user/common-args.txt
@@ -37,4 +37,6 @@ Argument                                  Description
 +\--verbose+                              Print more information while working
 +\--connection-param-file <filename>+     Optional properties file that\
                                           provides connection parameters
++\--relaxed-isolation+                    Set connection transaction isolation\
+                                          to read uncommitted for the mappers.
 -------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3758915/src/docs/user/import.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt
index 0db6d97..7a3fa43 100644
--- a/src/docs/user/import.txt
+++ b/src/docs/user/import.txt
@@ -285,6 +285,18 @@ are expected to be present in the shell path of the task 
process. For MySQL
 the utilities +mysqldump+ and +mysqlimport+ are required, whereas for
 PostgreSQL the utility +psql+ is required.
 
+Controlling transaction isolation
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+By default, Sqoop uses the read committed transaction isolation in the mappers
+to import data.  This may not be the ideal in all ETL workflows and it may
+desired to reduce the isolation guarantees.   The +\--relaxed-isolation+ option
+can be used to instruct Sqoop to use read uncommitted isolation level.
+
+The +read-uncommitted+ isolation level is not supported on all databases
+(for example, Oracle), so specifying the option +\--relaxed-isolation+
+may not be supported on all databases.
+
 Controlling type mapping
 ^^^^^^^^^^^^^^^^^^^^^^^^
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3758915/src/java/org/apache/sqoop/SqoopOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/SqoopOptions.java 
b/src/java/org/apache/sqoop/SqoopOptions.java
index 46e158c..f1b8b13 100644
--- a/src/java/org/apache/sqoop/SqoopOptions.java
+++ b/src/java/org/apache/sqoop/SqoopOptions.java
@@ -300,6 +300,9 @@ public class SqoopOptions implements Cloneable {
   // Accumulo zookeeper
   @StoredAsProperty("accumulo.zookeepers") private String accumuloZookeepers;
 
+  // Relaxed Isolation
+  @StoredAsProperty("relaxed.isolation") private boolean relaxedIsolation;
+
   // These next two fields are not serialized to the metastore.
   // If this SqoopOptions is created by reading a saved job, these will
   // be populated by the JobStorage to facilitate updating the same
@@ -962,6 +965,10 @@ public class SqoopOptions implements Cloneable {
     this.validatorClass = RowCountValidator.class;
     this.validationThresholdClass = AbsoluteValidationThreshold.class;
     this.validationFailureHandlerClass = AbortOnFailureHandler.class;
+
+    // Relaxed isolation will not enabled by default which is the behavior
+    // of sqoop until now.
+    this.relaxedIsolation = false;
   }
 
   /**
@@ -2449,4 +2456,13 @@ public class SqoopOptions implements Cloneable {
   public boolean isSkipDistCache() {
     return this.skipDistCache;
   }
+
+  public void setRelaxedIsolation(boolean b) {
+    this.relaxedIsolation = true;
+
+  }
+
+  public boolean getRelaxedIsolation() {
+    return this.relaxedIsolation;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3758915/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java 
b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
index b21560e..6dcfebb 100644
--- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
@@ -247,7 +247,13 @@ public class DataDrivenImportJob extends ImportJobBase {
         new DBConfiguration(job.getConfiguration()).setInputOrderBy(
             splitByCol);
       }
-
+      if (options.getRelaxedIsolation()) {
+        LOG
+          .info("Enabling relaxed (read uncommitted) transaction "
+             + "isolation for imports");
+        job.getConfiguration()
+          .setBoolean(DBConfiguration.PROP_RELAXED_ISOLATION, true);
+      }
       LOG.debug("Using table class: " + tableClassName);
       job.getConfiguration().set(ConfigurationHelper.getDbInputClassProperty(),
           tableClassName);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3758915/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java 
b/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java
index be942ce..a9b7e42 100644
--- a/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java
+++ b/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java
@@ -122,6 +122,13 @@ public class DBConfiguration {
     "mapreduce.jdbc.output.field.count";
 
   /**
+   * The name of the parameter to use for making Isolation level to be
+   * read uncommitted by default for connections.
+   */
+  public static final String PROP_RELAXED_ISOLATION =
+      "org.apache.sqoop.db.relaxedisolation";
+
+  /**
    * Sets the DB access related fields in the {@link Configuration}.
    * @param conf the configuration
    * @param driverClass JDBC Driver class name
@@ -150,6 +157,7 @@ public class DBConfiguration {
       conf.set(CONNECTION_PARAMS_PROPERTY,
                propertiesToString(connectionParams));
     }
+
   }
 
   // set the password in the secure credentials object
@@ -295,6 +303,7 @@ public class DBConfiguration {
                         connectString, username, password);
       }
     }
+
     return connection;
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3758915/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java 
b/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java
index 73ed94e..3a8e5d0 100644
--- a/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java
+++ b/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java
@@ -29,6 +29,8 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
@@ -58,7 +60,8 @@ import com.cloudera.sqoop.mapreduce.db.OracleDBRecordReader;
 public class DBInputFormat<T extends DBWritable>
 extends InputFormat<LongWritable, T> implements Configurable  {
 
-
+  public static final Log LOG = LogFactory.getLog(
+    DBInputFormat.class.getName());
   private String dbProductName = "DEFAULT";
 
   /**
@@ -160,9 +163,6 @@ extends InputFormat<LongWritable, T> implements 
Configurable  {
 
     try {
       getConnection();
-
-      DatabaseMetaData dbMeta = connection.getMetaData();
-      this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase();
     } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
@@ -172,6 +172,31 @@ extends InputFormat<LongWritable, T> implements 
Configurable  {
     conditions = dbConf.getInputConditions();
   }
 
+  private void setTxIsolation(Connection conn) {
+    try {
+
+      if (getConf()
+        .getBoolean(DBConfiguration.PROP_RELAXED_ISOLATION, false)) {
+        if (dbProductName.startsWith("ORACLE")) {
+          LOG.info("Using read committed transaction isolation for Oracle"
+            + " as read uncommitted is not supported");
+          this.connection.setTransactionIsolation(
+            Connection.TRANSACTION_READ_COMMITTED);
+        } else {
+          LOG.info("Using read uncommited transaction isolation");
+          this.connection.setTransactionIsolation(
+            Connection.TRANSACTION_READ_UNCOMMITTED);
+        }
+      }
+      else {
+        LOG.info("Using read commited transaction isolation");
+        this.connection.setTransactionIsolation(
+          Connection.TRANSACTION_READ_COMMITTED);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
   public Configuration getConf() {
     return dbConf.getConf();
   }
@@ -182,12 +207,14 @@ extends InputFormat<LongWritable, T> implements 
Configurable  {
 
   public Connection getConnection() {
     try {
+
       if (null == this.connection) {
         // The connection was closed; reinstantiate it.
         this.connection = dbConf.getConnection();
         this.connection.setAutoCommit(false);
-        this.connection.setTransactionIsolation(
-            Connection.TRANSACTION_READ_COMMITTED);
+        DatabaseMetaData dbMeta = connection.getMetaData();
+        this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase();
+        setTxIsolation(connection);
       }
     } catch (Exception e) {
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3758915/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java 
b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
index 6d6f1ea..ceda9f3 100644
--- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
+++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
@@ -155,6 +155,7 @@ public abstract class BaseSqoopTool extends 
com.cloudera.sqoop.tool.SqoopTool {
   public static final String UPDATE_MODE_ARG = "update-mode";
   public static final String CALL_ARG = "call";
   public static final String SKIP_DISTCACHE_ARG = "skip-dist-cache";
+  public static final String RELAXED_ISOLATION = "relaxed-isolation";
 
   // Arguments for validation.
   public static final String VALIDATE_ARG = "validate";
@@ -444,6 +445,11 @@ public abstract class BaseSqoopTool extends 
com.cloudera.sqoop.tool.SqoopTool {
         .withDescription("Print usage instructions")
         .withLongOpt(HELP_ARG)
         .create());
+    // relax isolation requirements
+    commonOpts.addOption(OptionBuilder
+        .withDescription("Use read-uncommitted isolation for imports")
+        .withLongOpt(RELAXED_ISOLATION)
+        .create());
 
     return commonOpts;
   }
@@ -969,6 +975,9 @@ public abstract class BaseSqoopTool extends 
com.cloudera.sqoop.tool.SqoopTool {
     } else if (in.hasOption(HADOOP_HOME_ARG)) {
         out.setHadoopMapRedHome(in.getOptionValue(HADOOP_HOME_ARG));
     }
+    if (in.hasOption(RELAXED_ISOLATION)) {
+      out.setRelaxedIsolation(true);
+    }
   }
 
   private void applyCredentialsOptions(CommandLine in, SqoopOptions out)

http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3758915/src/test/com/cloudera/sqoop/TestSqoopOptions.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/TestSqoopOptions.java 
b/src/test/com/cloudera/sqoop/TestSqoopOptions.java
index 686d398..60460d9 100644
--- a/src/test/com/cloudera/sqoop/TestSqoopOptions.java
+++ b/src/test/com/cloudera/sqoop/TestSqoopOptions.java
@@ -468,4 +468,11 @@ public class TestSqoopOptions extends TestCase {
   private static String longArgument(String argument) {
     return String.format("--%s", argument);
   }
+
+  public void testRelaxedIsolation() throws Exception {
+    String extraArgs[] = {
+      "--relaxed-isolation",
+    };
+    validateImportOptions(extraArgs);
+  }
 }

Reply via email to