Updated Branches: refs/heads/trunk 95098af0e -> 817195ebb
SQOOP-382: Connection parameters should be used on the mapper (David Robson via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/817195eb Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/817195eb Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/817195eb Branch: refs/heads/trunk Commit: 817195ebb0bd54c9a81f1f1960858f0b495e63bd Parents: 95098af Author: Jarek Jarcec Cecho <[email protected]> Authored: Mon Nov 26 16:26:18 2012 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Mon Nov 26 16:26:18 2012 -0800 ---------------------------------------------------------------------- .../sqoop/mapreduce/DataDrivenImportJob.java | 5 +- .../org/apache/sqoop/mapreduce/JdbcExportJob.java | 6 +- .../sqoop/mapreduce/JdbcUpdateExportJob.java | 6 +- .../sqoop/mapreduce/JdbcUpsertExportJob.java | 6 +- .../apache/sqoop/mapreduce/MySQLDumpImportJob.java | 5 +- .../org/apache/sqoop/mapreduce/MySQLExportJob.java | 5 +- .../sqoop/mapreduce/PGBulkloadExportJob.java | 6 +- .../apache/sqoop/mapreduce/db/DBConfiguration.java | 161 ++++++++++++++- .../sqoop/manager/PGBulkloadManagerManualTest.java | 2 +- .../mapreduce/db/TestDataDrivenDBInputFormat.java | 2 +- .../sqoop/mapreduce/db/TestDBConfiguration.java | 61 ++++++ 11 files changed, 238 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java index 9e5f102..ef1d363 100644 --- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java @@ -150,11 +150,12 @@ public class DataDrivenImportJob extends ImportJobBase { if (null == username || username.length() == 0) { DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), options.getConnectString(), - options.getFetchSize()); + options.getFetchSize(), options.getConnectionParams()); } else { DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), options.getConnectString(), - username, options.getPassword(), options.getFetchSize()); + username, options.getPassword(), options.getFetchSize(), + options.getConnectionParams()); } if (null != tableName) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java index bd52f00..7c52110 100644 --- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java @@ -115,12 +115,14 @@ public class JdbcExportJob extends ExportJobBase { if (null == username || username.length() == 0) { DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), - options.getConnectString()); + options.getConnectString(), + options.getConnectionParams()); } else { DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), options.getConnectString(), - username, options.getPassword()); + username, options.getPassword(), + options.getConnectionParams()); } String [] colNames = options.getColumns(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java index 21cb128..8fa420e 100644 --- a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java @@ -83,12 +83,14 @@ public class JdbcUpdateExportJob extends ExportJobBase { if (null == username || username.length() == 0) { DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), - options.getConnectString()); + options.getConnectString(), + options.getConnectionParams()); } else { DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), options.getConnectString(), - username, options.getPassword()); + username, options.getPassword(), + options.getConnectionParams()); } String [] colNames = options.getColumns(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java index c17b4bb..0a9bf7f 100644 --- a/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java @@ -56,12 +56,14 @@ public class JdbcUpsertExportJob extends JdbcUpdateExportJob { if (null == username || username.length() == 0) { DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), - options.getConnectString()); + options.getConnectString(), + options.getConnectionParams()); } else { DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), options.getConnectString(), - username, options.getPassword()); + username, options.getPassword(), + options.getConnectionParams()); } String [] colNames = options.getColumns(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java b/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java index 634bd34..43fbec4 100644 --- a/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java @@ -63,11 +63,12 @@ public class MySQLDumpImportJob extends ImportJobBase { String username = options.getUsername(); if (null == username || username.length() == 0) { DBConfiguration.configureDB(job.getConfiguration(), - mgr.getDriverClass(), options.getConnectString()); + mgr.getDriverClass(), options.getConnectString(), + options.getConnectionParams()); } else { DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), options.getConnectString(), username, - options.getPassword()); + options.getPassword(), options.getConnectionParams()); } String [] colNames = options.getColumns(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java b/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java index 0523901..16bdd74 100644 --- a/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java @@ -73,11 +73,12 @@ public class MySQLExportJob extends ExportJobBase { String username = options.getUsername(); if (null == username || username.length() == 0) { DBConfiguration.configureDB(job.getConfiguration(), - mgr.getDriverClass(), options.getConnectString()); + mgr.getDriverClass(), options.getConnectString(), + options.getConnectionParams()); } else { DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), options.getConnectString(), username, - options.getPassword()); + options.getPassword(), options.getConnectionParams()); } String [] colNames = options.getColumns(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java index f3f094b..cc60233 100644 --- a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java @@ -72,13 +72,15 @@ public class PGBulkloadExportJob extends ExportJobBase { DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), options.getConnectString(), - options.getFetchSize()); + options.getFetchSize(), + options.getConnectionParams()); } else { DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(), options.getConnectString(), username, options.getPassword(), - options.getFetchSize()); + options.getFetchSize(), + options.getConnectionParams()); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java b/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java index 22993df..d270bc8 100644 --- a/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java +++ b/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java @@ -20,8 +20,15 @@ package org.apache.sqoop.mapreduce.db; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.Properties; import org.apache.hadoop.conf.Configuration; +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.text.StrTokenizer; import org.apache.sqoop.mapreduce.DBWritable; import com.cloudera.sqoop.mapreduce.db.DBInputFormat.NullDBWritable; @@ -55,6 +62,10 @@ public class DBConfiguration { /** Password to access the database. */ public static final String PASSWORD_PROPERTY = "mapreduce.jdbc.password"; + /** JDBC connection parameters. */ + public static final String CONNECTION_PARAMS_PROPERTY = + "mapreduce.jdbc.params"; + /** Fetch size. */ public static final String FETCH_SIZE = "mapreduce.jdbc.fetchsize"; @@ -109,9 +120,11 @@ public class DBConfiguration { * @param userName DB access username * @param passwd DB access passwd * @param fetchSize DB fetch size + * @param connectionParams JDBC connection parameters */ public static void configureDB(Configuration conf, String driverClass, - String dbUrl, String userName, String passwd, Integer fetchSize) { + String dbUrl, String userName, String passwd, Integer fetchSize, + Properties connectionParams) { conf.set(DRIVER_CLASS_PROPERTY, driverClass); conf.set(URL_PROPERTY, dbUrl); @@ -124,6 +137,67 @@ public class DBConfiguration { if (fetchSize != null) { conf.setInt(FETCH_SIZE, fetchSize); } + if (connectionParams != null) { + conf.set(CONNECTION_PARAMS_PROPERTY, + propertiesToString(connectionParams)); + } + } + + /** + * Sets the DB access related fields in the JobConf. + * @param job the job + * @param driverClass JDBC Driver class name + * @param dbUrl JDBC DB access URL + * @param fetchSize DB fetch size + * @param connectionParams JDBC connection parameters + */ + public static void configureDB(Configuration job, String driverClass, + String dbUrl, Integer fetchSize, Properties connectionParams) { + configureDB(job, driverClass, dbUrl, null, null, fetchSize, + connectionParams); + } + + /** + * Sets the DB access related fields in the {@link Configuration}. + * @param conf the configuration + * @param driverClass JDBC Driver class name + * @param dbUrl JDBC DB access URL + * @param userName DB access username + * @param passwd DB access passwd + * @param connectionParams JDBC connection parameters + */ + public static void configureDB(Configuration conf, String driverClass, + String dbUrl, String userName, String passwd, + Properties connectionParams) { + configureDB(conf, driverClass, dbUrl, userName, passwd, null, + connectionParams); + } + + /** + * Sets the DB access related fields in the JobConf. + * @param job the job + * @param driverClass JDBC Driver class name + * @param dbUrl JDBC DB access URL. + * @param connectionParams JDBC connection parameters + */ + public static void configureDB(Configuration job, String driverClass, + String dbUrl, Properties connectionParams) { + configureDB(job, driverClass, dbUrl, null, connectionParams); + } + + /** + * Sets the DB access related fields in the {@link Configuration}. + * @param conf the configuration + * @param driverClass JDBC Driver class name + * @param dbUrl JDBC DB access URL + * @param userName DB access username + * @param passwd DB access passwd + * @param fetchSize DB fetch size + */ + public static void configureDB(Configuration conf, String driverClass, + String dbUrl, String userName, String passwd, Integer fetchSize) { + configureDB(conf, driverClass, dbUrl, userName, passwd, fetchSize, + (Properties) null); } /** @@ -135,7 +209,7 @@ public class DBConfiguration { */ public static void configureDB(Configuration job, String driverClass, String dbUrl, Integer fetchSize) { - configureDB(job, driverClass, dbUrl, null, null, fetchSize); + configureDB(job, driverClass, dbUrl, fetchSize, (Properties) null); } /** @@ -148,7 +222,7 @@ public class DBConfiguration { */ public static void configureDB(Configuration conf, String driverClass, String dbUrl, String userName, String passwd) { - configureDB(conf, driverClass, dbUrl, userName, passwd, null); + configureDB(conf, driverClass, dbUrl, userName, passwd, (Properties) null); } /** @@ -159,7 +233,7 @@ public class DBConfiguration { */ public static void configureDB(Configuration job, String driverClass, String dbUrl) { - configureDB(job, driverClass, dbUrl, null); + configureDB(job, driverClass, dbUrl, (Properties) null); } @@ -174,18 +248,38 @@ public class DBConfiguration { * @throws SQLException */ public Connection getConnection() throws ClassNotFoundException, SQLException { + Connection connection; Class.forName(conf.get(DBConfiguration.DRIVER_CLASS_PROPERTY)); - if (conf.get(DBConfiguration.USERNAME_PROPERTY) == null) { - return DriverManager.getConnection( - conf.get(DBConfiguration.URL_PROPERTY)); + String username = conf.get(DBConfiguration.USERNAME_PROPERTY); + String password = conf.get(DBConfiguration.PASSWORD_PROPERTY); + String connectString = conf.get(DBConfiguration.URL_PROPERTY); + String connectionParamsStr = + conf.get(DBConfiguration.CONNECTION_PARAMS_PROPERTY); + Properties connectionParams = propertiesFromString(connectionParamsStr); + + if (connectionParams != null && connectionParams.size() > 0) { + Properties props = new Properties(); + if (username != null) { + props.put("user", username); + } + + if (password != null) { + props.put("password", password); + } + + props.putAll(connectionParams); + connection = DriverManager.getConnection(connectString, props); } else { - return DriverManager.getConnection( - conf.get(DBConfiguration.URL_PROPERTY), - conf.get(DBConfiguration.USERNAME_PROPERTY), - conf.get(DBConfiguration.PASSWORD_PROPERTY)); + if (username == null) { + connection = DriverManager.getConnection(connectString); + } else { + connection = DriverManager.getConnection( + connectString, username, password); + } } + return connection; } public Configuration getConf() { @@ -306,4 +400,49 @@ public class DBConfiguration { return conf.getInt(OUTPUT_FIELD_COUNT_PROPERTY, 0); } + /** + * Converts connection properties to a String to be passed to the mappers. + * @param properties JDBC connection parameters + * @return String to be passed to configuration + */ + protected static String propertiesToString(Properties properties) { + List<String> propertiesList = new ArrayList<String>(properties.size()); + for(Entry<Object, Object> property : properties.entrySet()) { + String key = StringEscapeUtils.escapeCsv(property.getKey().toString()); + if (key.equals(property.getKey().toString()) && key.contains("=")) { + key = "\"" + key + "\""; + } + String val = StringEscapeUtils.escapeCsv(property.getValue().toString()); + if (val.equals(property.getValue().toString()) && val.contains("=")) { + val = "\"" + val + "\""; + } + propertiesList.add(StringEscapeUtils.escapeCsv(key + "=" + val)); + } + return StringUtils.join(propertiesList, ','); + } + + /** + * Converts a String back to connection parameters. + * @param input String from configuration + * @return JDBC connection parameters + */ + protected static Properties propertiesFromString(String input) { + if (input != null && !input.isEmpty()) { + Properties result = new Properties(); + StrTokenizer propertyTokenizer = StrTokenizer.getCSVInstance(input); + StrTokenizer valueTokenizer = StrTokenizer.getCSVInstance(); + valueTokenizer.setDelimiterChar('='); + while (propertyTokenizer.hasNext()){ + valueTokenizer.reset(propertyTokenizer.nextToken()); + String[] values = valueTokenizer.getTokenArray(); + if (values.length==2) { + result.put(values[0], values[1]); + } + } + return result; + } else { + return null; + } + } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java b/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java index fff35dc..0403614 100644 --- a/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java +++ b/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java @@ -72,7 +72,7 @@ public class PGBulkloadManagerManualTest extends TestExport { "org.postgresql.Driver", getConnectString(), getUserName(), - null, null); + (String) null, (Integer) null); dbConf = new DBConfiguration(conf); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/test/com/cloudera/sqoop/mapreduce/db/TestDataDrivenDBInputFormat.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/mapreduce/db/TestDataDrivenDBInputFormat.java b/src/test/com/cloudera/sqoop/mapreduce/db/TestDataDrivenDBInputFormat.java index 6b4214c..fed22b8 100644 --- a/src/test/com/cloudera/sqoop/mapreduce/db/TestDataDrivenDBInputFormat.java +++ b/src/test/com/cloudera/sqoop/mapreduce/db/TestDataDrivenDBInputFormat.java @@ -206,7 +206,7 @@ public class TestDataDrivenDBInputFormat extends TestCase { job.getConfiguration().setInt("mapreduce.map.tasks", 2); FileOutputFormat.setOutputPath(job, new Path(OUT_DIR)); DBConfiguration.configureDB(job.getConfiguration(), DRIVER_CLASS, - DB_URL, null, null); + DB_URL, (String) null, (String) null); DataDrivenDBInputFormat.setInput(job, DateCol.class, DATE_TABLE, null, COL, COL); http://git-wip-us.apache.org/repos/asf/sqoop/blob/817195eb/src/test/org/apache/sqoop/mapreduce/db/TestDBConfiguration.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/mapreduce/db/TestDBConfiguration.java b/src/test/org/apache/sqoop/mapreduce/db/TestDBConfiguration.java new file mode 100644 index 0000000..cad1004 --- /dev/null +++ b/src/test/org/apache/sqoop/mapreduce/db/TestDBConfiguration.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.db; + +import java.util.Properties; + +import junit.framework.TestCase; + +/** + * Test aspects of DBConfiguration. + */ +public class TestDBConfiguration extends TestCase { + + public void testPropertiesToString() { + Properties connParams = new Properties(); + connParams.setProperty("a", "value-a"); + connParams.setProperty("b", "value-b"); + connParams.setProperty("a.b", "value-a.b"); + connParams.setProperty("a.b.c", "value-a.b.c"); + connParams.setProperty("aaaaaaaaaa.bbbbbbb.cccccccc", "value-abc"); + String result = DBConfiguration.propertiesToString(connParams); + Properties resultParams = DBConfiguration.propertiesFromString(result); + assertEquals("connection params don't match", connParams, resultParams); + + connParams = new Properties(); + connParams.put("conn.timeout", "3000"); + connParams.put("conn.buffer_size", "256"); + connParams.put("conn.dummy", "dummy"); + connParams.put("conn.foo", "bar"); + result = DBConfiguration.propertiesToString(connParams); + resultParams = DBConfiguration.propertiesFromString(result); + assertEquals("connection params don't match", connParams, resultParams); + + connParams = new Properties(); + connParams.put("user", "ABC"); + connParams.put("password", "complex\"pass,word\\123"); + connParams.put("complex\"param,\\name", "dummy"); + connParams.put("conn.buffer=size", "256"); + connParams.put("jdbc.property", "a=b"); + result = DBConfiguration.propertiesToString(connParams); + resultParams = DBConfiguration.propertiesFromString(result); + assertEquals("connection params don't match", connParams, resultParams); + } + +}
