Repository: hadoop Updated Branches: refs/heads/trunk 7e42088ab -> 241336ca2
MAPREDUCE-6237. Multiple mappers with DBInputFormat don't work because of reusing conections. Contributed by Kannan Rajah. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/241336ca Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/241336ca Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/241336ca Branch: refs/heads/trunk Commit: 241336ca2b7cf97d7e0bd84dbe0542b72f304dc9 Parents: 7e42088 Author: Tsuyoshi Ozawa <oz...@apache.org> Authored: Tue Feb 10 03:52:42 2015 +0900 Committer: Tsuyoshi Ozawa <oz...@apache.org> Committed: Tue Feb 10 03:52:42 2015 +0900 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 15 ++++++++++ .../hadoop/mapreduce/lib/db/DBInputFormat.java | 31 ++++++++++++-------- .../lib/db/DataDrivenDBInputFormat.java | 5 ++-- .../lib/db/OracleDataDrivenDBInputFormat.java | 2 +- 4 files changed, 37 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/241336ca/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 583c6c1..c71fee8 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -346,6 +346,21 @@ Release 2.7.0 - UNRELEASED MAPREDUCE-6233. org.apache.hadoop.mapreduce.TestLargeSort.testLargeSort failed in trunk (zxu via rkanter) +Release 2.6.1 - UNRELEASED + + INCOMPATIBLE CHANGES + + NEW FEATURES + + IMPROVEMENTS + + OPTIMIZATIONS + + BUG FIXES + + MAPREDUCE-6237. Multiple mappers with DBInputFormat don't work because of + reusing conections. (Kannan Rajah via ozawa) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/241336ca/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java index c0530c2..00fbeda 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java @@ -159,7 +159,7 @@ public class DBInputFormat<T extends DBWritable> dbConf = new DBConfiguration(conf); try { - getConnection(); + this.connection = createConnection(); DatabaseMetaData dbMeta = connection.getMetaData(); this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase(); @@ -182,18 +182,25 @@ public class DBInputFormat<T extends DBWritable> } public Connection getConnection() { + // TODO Remove this code that handles backward compatibility. + if (this.connection == null) { + this.connection = createConnection(); + } + + return this.connection; + } + + public Connection createConnection() { try { - if (null == this.connection) { - // The connection was closed; reinstantiate it. - this.connection = dbConf.getConnection(); - this.connection.setAutoCommit(false); - this.connection.setTransactionIsolation( - Connection.TRANSACTION_SERIALIZABLE); - } + Connection newConnection = dbConf.getConnection(); + newConnection.setAutoCommit(false); + newConnection.setTransactionIsolation( + Connection.TRANSACTION_SERIALIZABLE); + + return newConnection; } catch (Exception e) { throw new RuntimeException(e); } - return connection; } public String getDBProductName() { @@ -210,17 +217,17 @@ public class DBInputFormat<T extends DBWritable> if (dbProductName.startsWith("ORACLE")) { // use Oracle-specific db reader. return new OracleDBRecordReader<T>(split, inputClass, - conf, getConnection(), getDBConf(), conditions, fieldNames, + conf, createConnection(), getDBConf(), conditions, fieldNames, tableName); } else if (dbProductName.startsWith("MYSQL")) { // use MySQL-specific db reader. return new MySQLDBRecordReader<T>(split, inputClass, - conf, getConnection(), getDBConf(), conditions, fieldNames, + conf, createConnection(), getDBConf(), conditions, fieldNames, tableName); } else { // Generic reader. return new DBRecordReader<T>(split, inputClass, - conf, getConnection(), getDBConf(), conditions, fieldNames, + conf, createConnection(), getDBConf(), conditions, fieldNames, tableName); } } catch (SQLException ex) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/241336ca/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java index 131b7bb..753c880 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java @@ -178,7 +178,6 @@ public class DataDrivenDBInputFormat<T extends DBWritable> ResultSet results = null; Statement statement = null; - Connection connection = getConnection(); try { statement = connection.createStatement(); @@ -289,12 +288,12 @@ public class DataDrivenDBInputFormat<T extends DBWritable> if (dbProductName.startsWith("MYSQL")) { // use MySQL-specific db reader. return new MySQLDataDrivenDBRecordReader<T>(split, inputClass, - conf, getConnection(), dbConf, dbConf.getInputConditions(), + conf, createConnection(), dbConf, dbConf.getInputConditions(), dbConf.getInputFieldNames(), dbConf.getInputTableName()); } else { // Generic reader. return new DataDrivenDBRecordReader<T>(split, inputClass, - conf, getConnection(), dbConf, dbConf.getInputConditions(), + conf, createConnection(), dbConf, dbConf.getInputConditions(), dbConf.getInputFieldNames(), dbConf.getInputTableName(), dbProductName); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/241336ca/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java index 8fbd473..a02471e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java @@ -84,7 +84,7 @@ public class OracleDataDrivenDBInputFormat<T extends DBWritable> try { // Use Oracle-specific db reader return new OracleDataDrivenDBRecordReader<T>(split, inputClass, - conf, getConnection(), dbConf, dbConf.getInputConditions(), + conf, createConnection(), dbConf, dbConf.getInputConditions(), dbConf.getInputFieldNames(), dbConf.getInputTableName()); } catch (SQLException ex) { throw new IOException(ex.getMessage());