Updated Branches: refs/heads/trunk fb29b8f9f -> b8fd60202
SQOOP-1118: Move PostgreSQL specific MR codes to org.apache.sqoop.mapreduce.posgresql (Masatake Iwasaki via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/b8fd6020 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/b8fd6020 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/b8fd6020 Branch: refs/heads/trunk Commit: b8fd60202865fa5c58c61361c45496e01bf05456 Parents: fb29b8f Author: Jarek Jarcec Cecho <[email protected]> Authored: Tue Jul 9 19:45:32 2013 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Tue Jul 9 19:45:32 2013 -0700 ---------------------------------------------------------------------- .../apache/sqoop/manager/PGBulkloadManager.java | 2 +- .../sqoop/mapreduce/PGBulkloadExportJob.java | 209 ------------- .../sqoop/mapreduce/PGBulkloadExportMapper.java | 310 ------------------ .../mapreduce/PGBulkloadExportReducer.java | 107 ------- .../postgresql/PGBulkloadExportJob.java | 210 +++++++++++++ .../postgresql/PGBulkloadExportMapper.java | 311 +++++++++++++++++++ .../postgresql/PGBulkloadExportReducer.java | 108 +++++++ 7 files changed, 630 insertions(+), 627 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/manager/PGBulkloadManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/PGBulkloadManager.java b/src/java/org/apache/sqoop/manager/PGBulkloadManager.java index 091fd15..04e1443 100644 --- a/src/java/org/apache/sqoop/manager/PGBulkloadManager.java +++ b/src/java/org/apache/sqoop/manager/PGBulkloadManager.java @@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.sqoop.mapreduce.ExportInputFormat; -import org.apache.sqoop.mapreduce.PGBulkloadExportJob; +import org.apache.sqoop.mapreduce.postgresql.PGBulkloadExportJob; http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java deleted file mode 100644 index cc60233..0000000 --- a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java +++ /dev/null @@ -1,209 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.mapreduce; - -import java.io.IOException; -import com.cloudera.sqoop.manager.ExportJobContext; -import com.cloudera.sqoop.util.ExportException; -import com.cloudera.sqoop.SqoopOptions; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.sqoop.config.ConfigurationHelper; -import org.apache.sqoop.lib.DelimiterSet; -import org.apache.sqoop.manager.ConnManager; -import org.apache.sqoop.mapreduce.db.DBConfiguration; -import org.apache.sqoop.orm.TableClassName; - - -/** - * Class that runs an export job using pg_bulkload in the mapper. - */ -public class PGBulkloadExportJob extends ExportJobBase { - - public static final Log LOG = - LogFactory.getLog(PGBulkloadExportJob.class.getName()); - - - public PGBulkloadExportJob(final ExportJobContext context) { - super(context); - } - - - public PGBulkloadExportJob(final ExportJobContext ctxt, - final Class<? extends Mapper> mapperClass, - final Class<? extends InputFormat> inputFormatClass, - final Class<? extends OutputFormat> outputFormatClass) { - super(ctxt, mapperClass, inputFormatClass, outputFormatClass); - } - - - @Override - protected void configureInputFormat(Job job, String tableName, - String tableClassName, String splitByCol) - throws ClassNotFoundException, IOException { - super.configureInputFormat(job, tableName, tableClassName, splitByCol); - ConnManager mgr = context.getConnManager(); - String username = options.getUsername(); - if (null == username || username.length() == 0) { - DBConfiguration.configureDB(job.getConfiguration(), - mgr.getDriverClass(), - options.getConnectString(), - options.getFetchSize(), - options.getConnectionParams()); - } else { - DBConfiguration.configureDB(job.getConfiguration(), - mgr.getDriverClass(), - options.getConnectString(), - username, options.getPassword(), - options.getFetchSize(), - options.getConnectionParams()); - } - } - - - @Override - protected Class<? extends Mapper> getMapperClass() { - return PGBulkloadExportMapper.class; - } - - - protected Class<? extends Reducer> getReducerClass() { - return PGBulkloadExportReducer.class; - } - - - private void setDelimiter(String prop, char val, Configuration conf) { - switch (val) { - case DelimiterSet.NULL_CHAR: - break; - case '\t': - default: - conf.set(prop, String.valueOf(val)); - } - } - - - @Override - protected void propagateOptionsToJob(Job job) { - super.propagateOptionsToJob(job); - SqoopOptions opts = context.getOptions(); - Configuration conf = job.getConfiguration(); - conf.setIfUnset("pgbulkload.bin", "pg_bulkload"); - if (opts.getNullStringValue() != null) { - conf.set("pgbulkload.null.string", opts.getNullStringValue()); - } - setDelimiter("pgbulkload.input.field.delim", - opts.getInputFieldDelim(), - conf); - setDelimiter("pgbulkload.input.record.delim", - opts.getInputRecordDelim(), - conf); - setDelimiter("pgbulkload.input.enclosedby", - opts.getInputEnclosedBy(), - conf); - setDelimiter("pgbulkload.input.escapedby", - opts.getInputEscapedBy(), - conf); - conf.setBoolean("pgbulkload.input.encloserequired", - opts.isInputEncloseRequired()); - conf.setIfUnset("pgbulkload.check.constraints", "YES"); - conf.setIfUnset("pgbulkload.parse.errors", "INFINITE"); - conf.setIfUnset("pgbulkload.duplicate.errors", "INFINITE"); - conf.set("mapred.jar", context.getJarFile()); - conf.setBoolean("mapred.map.tasks.speculative.execution", false); - conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); - conf.setInt("mapred.map.max.attempts", 1); - conf.setInt("mapred.reduce.max.attempts", 1); - conf.setIfUnset("mapred.reduce.tasks", "1"); - if (context.getOptions().doClearStagingTable()) { - conf.setBoolean("pgbulkload.clear.staging.table", true); - } - } - - - @Override - public void runExport() throws ExportException, IOException { - ConnManager cmgr = context.getConnManager(); - SqoopOptions options = context.getOptions(); - Configuration conf = options.getConf(); - DBConfiguration dbConf = null; - String outputTableName = context.getTableName(); - String tableName = outputTableName; - String tableClassName = - new TableClassName(options).getClassForTable(outputTableName); - - LOG.info("Beginning export of " + outputTableName); - loadJars(conf, context.getJarFile(), tableClassName); - - try { - Job job = new Job(conf); - dbConf = new DBConfiguration(job.getConfiguration()); - dbConf.setOutputTableName(tableName); - configureInputFormat(job, tableName, tableClassName, null); - configureOutputFormat(job, tableName, tableClassName); - configureNumTasks(job); - propagateOptionsToJob(job); - job.setMapperClass(getMapperClass()); - job.setMapOutputKeyClass(LongWritable.class); - job.setMapOutputValueClass(Text.class); - job.setReducerClass(getReducerClass()); - cacheJars(job, context.getConnManager()); - setJob(job); - - boolean success = runJob(job); - if (!success) { - throw new ExportException("Export job failed!"); - } - } catch (InterruptedException ie) { - throw new IOException(ie); - } catch (ClassNotFoundException cnfe) { - throw new IOException(cnfe); - } finally { - unloadJars(); - } - } - - - @Override - protected int configureNumTasks(Job job) throws IOException { - SqoopOptions options = context.getOptions(); - int numMapTasks = options.getNumMappers(); - if (numMapTasks < 1) { - numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS; - LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers."); - } - - ConfigurationHelper.setJobNumMaps(job, numMapTasks); - return numMapTasks; - } - - - private void clearStagingTable(DBConfiguration dbConf, String tableName) - throws IOException { - // clearing stagingtable is done each mapper tasks - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java deleted file mode 100644 index 81b1333..0000000 --- a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java +++ /dev/null @@ -1,310 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.mapreduce; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.List; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.sqoop.lib.SqoopRecord; -import org.apache.hadoop.mapreduce.Mapper.Context; -import org.apache.sqoop.mapreduce.db.DBConfiguration; -import org.apache.sqoop.util.LoggingUtils; -import org.apache.sqoop.util.PostgreSQLUtils; -import org.apache.sqoop.util.Executor; -import org.apache.sqoop.util.JdbcUrl; - - -/** - * Mapper that starts a 'pg_bulkload' process and uses that to export rows from - * HDFS to a PostgreSQL database at high speed. - * - * map() methods are actually provided by subclasses that read from - * SequenceFiles (containing existing SqoopRecords) or text files - * (containing delimited lines) and deliver these results to the stream - * used to interface with pg_bulkload. - */ -public class PGBulkloadExportMapper - extends AutoProgressMapper<LongWritable, Writable, LongWritable, Text> { - private Configuration conf; - private DBConfiguration dbConf; - private Process process; - private OutputStream out; - protected BufferedWriter writer; - private Thread thread; - protected String tmpTableName; - private String tableName; - private String passwordFilename; - - - public PGBulkloadExportMapper() { - } - - - protected void setup(Context context) - throws IOException, InterruptedException { - super.setup(context); - conf = context.getConfiguration(); - dbConf = new DBConfiguration(conf); - tableName = dbConf.getOutputTableName(); - tmpTableName = tableName + "_" + context.getTaskAttemptID().toString(); - - Connection conn = null; - try { - conn = dbConf.getConnection(); - conn.setAutoCommit(false); - if (conf.getBoolean("pgbulkload.clear.staging.table", false)) { - StringBuffer query = new StringBuffer(); - query.append("DROP TABLE IF EXISTS "); - query.append(tmpTableName); - doExecuteUpdate(query.toString()); - } - StringBuffer query = new StringBuffer(); - query.append("CREATE TABLE "); - query.append(tmpTableName); - query.append("(LIKE "); - query.append(tableName); - query.append(" INCLUDING CONSTRAINTS)"); - if (conf.get("pgbulkload.staging.tablespace") != null) { - query.append("TABLESPACE "); - query.append(conf.get("pgbulkload.staging.tablespace")); - } - doExecuteUpdate(query.toString()); - conn.commit(); - } catch (ClassNotFoundException ex) { - LOG.error("Unable to load JDBC driver class", ex); - throw new IOException(ex); - } catch (SQLException ex) { - LoggingUtils.logAll(LOG, "Unable to execute statement", ex); - throw new IOException(ex); - } finally { - try { - conn.close(); - } catch (SQLException ex) { - LoggingUtils.logAll(LOG, "Unable to close connection", ex); - } - } - - try { - ArrayList<String> args = new ArrayList<String>(); - List<String> envp = Executor.getCurEnvpStrings(); - args.add(conf.get("pgbulkload.bin", "pg_bulkload")); - args.add("--username=" - + conf.get(DBConfiguration.USERNAME_PROPERTY)); - args.add("--dbname=" - + JdbcUrl.getDatabaseName(conf.get(DBConfiguration.URL_PROPERTY))); - args.add("--host=" - + JdbcUrl.getHostName(conf.get(DBConfiguration.URL_PROPERTY))); - args.add("--port=" - + JdbcUrl.getPort(conf.get(DBConfiguration.URL_PROPERTY))); - args.add("--input=stdin"); - args.add("--output=" + tmpTableName); - args.add("-o"); - args.add("TYPE=CSV"); - args.add("-o"); - args.add("DELIMITER=" + conf.get("pgbulkload.input.field.delim", ",")); - args.add("-o"); - args.add("QUOTE=" + conf.get("pgbulkload.input.enclosedby", "\"")); - args.add("-o"); - args.add("ESCAPE=" + conf.get("pgbulkload.input.escapedby", "\"")); - args.add("-o"); - args.add("CHECK_CONSTRAINTS=" + conf.get("pgbulkload.check.constraints")); - args.add("-o"); - args.add("PARSE_ERRORS=" + conf.get("pgbulkload.parse.errors")); - args.add("-o"); - args.add("DUPLICATE_ERRORS=" + conf.get("pgbulkload.duplicate.errors")); - if (conf.get("pgbulkload.null.string") != null) { - args.add("-o"); - args.add("NULL=" + conf.get("pgbulkload.null.string")); - } - if (conf.get("pgbulkload.filter") != null) { - args.add("-o"); - args.add("FILTER=" + conf.get("pgbulkload.filter")); - } - LOG.debug("Starting pg_bulkload with arguments:"); - for (String arg : args) { - LOG.debug(" " + arg); - } - if (conf.get(DBConfiguration.PASSWORD_PROPERTY) != null) { - String tmpDir = System.getProperty("test.build.data", "/tmp/"); - if (!tmpDir.endsWith(File.separator)) { - tmpDir = tmpDir + File.separator; - } - tmpDir = conf.get("job.local.dir", tmpDir); - passwordFilename = PostgreSQLUtils.writePasswordFile(tmpDir, - conf.get(DBConfiguration.PASSWORD_PROPERTY)); - envp.add("PGPASSFILE=" + passwordFilename); - } - process = Runtime.getRuntime().exec(args.toArray(new String[0]), - envp.toArray(new String[0])); - out = process.getOutputStream(); - writer = new BufferedWriter(new OutputStreamWriter(out)); - thread = new ReadThread(process.getErrorStream()); - thread.start(); - } catch (Exception e) { - LOG.error("Can't start up pg_bulkload process", e); - cleanup(context); - doExecuteUpdate("DROP TABLE " + tmpTableName); - throw new IOException(e); - } - } - - - public void map(LongWritable key, Writable value, Context context) - throws IOException, InterruptedException { - try { - String str = value.toString(); - if (value instanceof Text) { - writer.write(str, 0, str.length()); - writer.newLine(); - } else if (value instanceof SqoopRecord) { - writer.write(str, 0, str.length()); - } - } catch (Exception e) { - doExecuteUpdate("DROP TABLE " + tmpTableName); - cleanup(context); - throw new IOException(e); - } - } - - - protected void cleanup(Context context) - throws IOException, InterruptedException { - LongWritable taskid = - new LongWritable(context.getTaskAttemptID().getTaskID().getId()); - context.write(taskid, new Text(tmpTableName)); - - if (writer != null) { - writer.close(); - } - if (out != null) { - out.close(); - } - try { - if (thread != null) { - thread.join(); - } - } finally { - // block until the process is done. - if (null != process) { - while (true) { - try { - int returnValue = process.waitFor(); - - // Check pg_bulkload's process return value - if (returnValue != 0) { - throw new RuntimeException( - "Unexpected return value from pg_bulkload: "+ returnValue); - } - } catch (InterruptedException ie) { - // interrupted; loop around. - LOG.debug("Caught interrupted exception waiting for process " - + "pg_bulkload.bin to exit"); - //Clear the interrupted flag. We have to call Thread.interrupted - //to clear for interrupted exceptions from process.waitFor - //See http://bugs.sun.com/view_bug.do?bug_id=6420270 for more info - Thread.interrupted(); - continue; - } - break; - } - } - } - if (null != passwordFilename) { - if (!new File(passwordFilename).delete()) { - LOG.error("Could not remove postgresql password file " - + passwordFilename); - LOG.error("You should remove this file to protect your credentials."); - } - } - } - - - protected int doExecuteUpdate(String query) throws IOException { - Connection conn = null; - try { - conn = dbConf.getConnection(); - conn.setAutoCommit(false); - } catch (ClassNotFoundException ex) { - LOG.error("Unable to load JDBC driver class", ex); - throw new IOException(ex); - } catch (SQLException ex) { - LoggingUtils.logAll(LOG, "Unable to connect to database", ex); - throw new IOException(ex); - } - Statement stmt = null; - try { - stmt = conn.createStatement(); - int ret = stmt.executeUpdate(query); - conn.commit(); - return ret; - } catch (SQLException ex) { - LoggingUtils.logAll(LOG, "Unable to execute query: " + query, ex); - throw new IOException(ex); - } finally { - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException ex) { - LoggingUtils.logAll(LOG, "Unable to close statement", ex); - } - } - try { - conn.close(); - } catch (SQLException ex) { - LoggingUtils.logAll(LOG, "Unable to close connection", ex); - } - } - } - - - private class ReadThread extends Thread { - private InputStream in; - - ReadThread(InputStream in) { - this.in = in; - } - - public void run() { - BufferedReader reader = new BufferedReader(new InputStreamReader(in)); - String line = null; - try { - while((line = reader.readLine()) != null) { - System.out.println(line); - } - reader.close(); - } catch (Exception e) { - e.printStackTrace(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java deleted file mode 100644 index 6f55861..0000000 --- a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.mapreduce; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.sqoop.mapreduce.db.DBConfiguration; -import org.apache.sqoop.util.LoggingUtils; - - -/** - * Reducer for transfering data from temporary table to destination. - * Reducer drops all temporary tables if all data successfully transfered. - * Temporary tables is not dropptd in error case for manual retry. - */ -public class PGBulkloadExportReducer - extends AutoProgressReducer<LongWritable, Text, - NullWritable, NullWritable> { - - public static final Log LOG = - LogFactory.getLog(PGBulkloadExportReducer.class.getName()); - private Configuration conf; - private DBConfiguration dbConf; - private Connection conn; - private String tableName; - - - protected void setup(Context context) - throws IOException, InterruptedException { - conf = context.getConfiguration(); - dbConf = new DBConfiguration(conf); - tableName = dbConf.getOutputTableName(); - try { - conn = dbConf.getConnection(); - conn.setAutoCommit(false); - } catch (ClassNotFoundException ex) { - LOG.error("Unable to load JDBC driver class", ex); - throw new IOException(ex); - } catch (SQLException ex) { - LoggingUtils.logAll(LOG, "Unable to connect to database", ex); - throw new IOException(ex); - } - } - - - @Override - public void reduce(LongWritable key, Iterable<Text> values, Context context) - throws IOException, InterruptedException { - Statement stmt = null; - try { - stmt = conn.createStatement(); - for (Text value : values) { - int inserted = stmt.executeUpdate("INSERT INTO " + tableName - + " ( SELECT * FROM " + value + " )"); - stmt.executeUpdate("DROP TABLE " + value); - } - conn.commit(); - } catch (SQLException ex) { - LoggingUtils.logAll(LOG, "Unable to execute create query.", ex); - throw new IOException(ex); - } finally { - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException ex) { - LoggingUtils.logAll(LOG, "Unable to close statement", ex); - } - } - } - } - - - protected void cleanup(Context context) - throws IOException, InterruptedException { - try { - conn.close(); - } catch (SQLException ex) { - LoggingUtils.logAll(LOG, "Unable to load JDBC driver class", ex); - throw new IOException(ex); - } - } - -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java new file mode 100644 index 0000000..79fb7da --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.postgresql; + +import java.io.IOException; +import com.cloudera.sqoop.manager.ExportJobContext; +import com.cloudera.sqoop.util.ExportException; +import com.cloudera.sqoop.SqoopOptions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.sqoop.config.ConfigurationHelper; +import org.apache.sqoop.lib.DelimiterSet; +import org.apache.sqoop.manager.ConnManager; +import org.apache.sqoop.mapreduce.ExportJobBase; +import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.apache.sqoop.orm.TableClassName; + + +/** + * Class that runs an export job using pg_bulkload in the mapper. + */ +public class PGBulkloadExportJob extends ExportJobBase { + + public static final Log LOG = + LogFactory.getLog(PGBulkloadExportJob.class.getName()); + + + public PGBulkloadExportJob(final ExportJobContext context) { + super(context); + } + + + public PGBulkloadExportJob(final ExportJobContext ctxt, + final Class<? extends Mapper> mapperClass, + final Class<? extends InputFormat> inputFormatClass, + final Class<? extends OutputFormat> outputFormatClass) { + super(ctxt, mapperClass, inputFormatClass, outputFormatClass); + } + + + @Override + protected void configureInputFormat(Job job, String tableName, + String tableClassName, String splitByCol) + throws ClassNotFoundException, IOException { + super.configureInputFormat(job, tableName, tableClassName, splitByCol); + ConnManager mgr = context.getConnManager(); + String username = options.getUsername(); + if (null == username || username.length() == 0) { + DBConfiguration.configureDB(job.getConfiguration(), + mgr.getDriverClass(), + options.getConnectString(), + options.getFetchSize(), + options.getConnectionParams()); + } else { + DBConfiguration.configureDB(job.getConfiguration(), + mgr.getDriverClass(), + options.getConnectString(), + username, options.getPassword(), + options.getFetchSize(), + options.getConnectionParams()); + } + } + + + @Override + protected Class<? extends Mapper> getMapperClass() { + return PGBulkloadExportMapper.class; + } + + + protected Class<? extends Reducer> getReducerClass() { + return PGBulkloadExportReducer.class; + } + + + private void setDelimiter(String prop, char val, Configuration conf) { + switch (val) { + case DelimiterSet.NULL_CHAR: + break; + case '\t': + default: + conf.set(prop, String.valueOf(val)); + } + } + + + @Override + protected void propagateOptionsToJob(Job job) { + super.propagateOptionsToJob(job); + SqoopOptions opts = context.getOptions(); + Configuration conf = job.getConfiguration(); + conf.setIfUnset("pgbulkload.bin", "pg_bulkload"); + if (opts.getNullStringValue() != null) { + conf.set("pgbulkload.null.string", opts.getNullStringValue()); + } + setDelimiter("pgbulkload.input.field.delim", + opts.getInputFieldDelim(), + conf); + setDelimiter("pgbulkload.input.record.delim", + opts.getInputRecordDelim(), + conf); + setDelimiter("pgbulkload.input.enclosedby", + opts.getInputEnclosedBy(), + conf); + setDelimiter("pgbulkload.input.escapedby", + opts.getInputEscapedBy(), + conf); + conf.setBoolean("pgbulkload.input.encloserequired", + opts.isInputEncloseRequired()); + conf.setIfUnset("pgbulkload.check.constraints", "YES"); + conf.setIfUnset("pgbulkload.parse.errors", "INFINITE"); + conf.setIfUnset("pgbulkload.duplicate.errors", "INFINITE"); + conf.set("mapred.jar", context.getJarFile()); + conf.setBoolean("mapred.map.tasks.speculative.execution", false); + conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); + conf.setInt("mapred.map.max.attempts", 1); + conf.setInt("mapred.reduce.max.attempts", 1); + conf.setIfUnset("mapred.reduce.tasks", "1"); + if (context.getOptions().doClearStagingTable()) { + conf.setBoolean("pgbulkload.clear.staging.table", true); + } + } + + + @Override + public void runExport() throws ExportException, IOException { + ConnManager cmgr = context.getConnManager(); + SqoopOptions options = context.getOptions(); + Configuration conf = options.getConf(); + DBConfiguration dbConf = null; + String outputTableName = context.getTableName(); + String tableName = outputTableName; + String tableClassName = + new TableClassName(options).getClassForTable(outputTableName); + + LOG.info("Beginning export of " + outputTableName); + loadJars(conf, context.getJarFile(), tableClassName); + + try { + Job job = new Job(conf); + dbConf = new DBConfiguration(job.getConfiguration()); + dbConf.setOutputTableName(tableName); + configureInputFormat(job, tableName, tableClassName, null); + configureOutputFormat(job, tableName, tableClassName); + configureNumTasks(job); + propagateOptionsToJob(job); + job.setMapperClass(getMapperClass()); + job.setMapOutputKeyClass(LongWritable.class); + job.setMapOutputValueClass(Text.class); + job.setReducerClass(getReducerClass()); + cacheJars(job, context.getConnManager()); + setJob(job); + + boolean success = runJob(job); + if (!success) { + throw new ExportException("Export job failed!"); + } + } catch (InterruptedException ie) { + throw new IOException(ie); + } catch (ClassNotFoundException cnfe) { + throw new IOException(cnfe); + } finally { + unloadJars(); + } + } + + + @Override + protected int configureNumTasks(Job job) throws IOException { + SqoopOptions options = context.getOptions(); + int numMapTasks = options.getNumMappers(); + if (numMapTasks < 1) { + numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS; + LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers."); + } + + ConfigurationHelper.setJobNumMaps(job, numMapTasks); + return numMapTasks; + } + + + private void clearStagingTable(DBConfiguration dbConf, String tableName) + throws IOException { + // clearing stagingtable is done each mapper tasks + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportMapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportMapper.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportMapper.java new file mode 100644 index 0000000..333546f --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportMapper.java @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.postgresql; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.sqoop.lib.SqoopRecord; +import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.sqoop.mapreduce.AutoProgressMapper; +import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.apache.sqoop.util.LoggingUtils; +import org.apache.sqoop.util.PostgreSQLUtils; +import org.apache.sqoop.util.Executor; +import org.apache.sqoop.util.JdbcUrl; + + +/** + * Mapper that starts a 'pg_bulkload' process and uses that to export rows from + * HDFS to a PostgreSQL database at high speed. + * + * map() methods are actually provided by subclasses that read from + * SequenceFiles (containing existing SqoopRecords) or text files + * (containing delimited lines) and deliver these results to the stream + * used to interface with pg_bulkload. + */ +public class PGBulkloadExportMapper + extends AutoProgressMapper<LongWritable, Writable, LongWritable, Text> { + private Configuration conf; + private DBConfiguration dbConf; + private Process process; + private OutputStream out; + protected BufferedWriter writer; + private Thread thread; + protected String tmpTableName; + private String tableName; + private String passwordFilename; + + + public PGBulkloadExportMapper() { + } + + + protected void setup(Context context) + throws IOException, InterruptedException { + super.setup(context); + conf = context.getConfiguration(); + dbConf = new DBConfiguration(conf); + tableName = dbConf.getOutputTableName(); + tmpTableName = tableName + "_" + context.getTaskAttemptID().toString(); + + Connection conn = null; + try { + conn = dbConf.getConnection(); + conn.setAutoCommit(false); + if (conf.getBoolean("pgbulkload.clear.staging.table", false)) { + StringBuffer query = new StringBuffer(); + query.append("DROP TABLE IF EXISTS "); + query.append(tmpTableName); + doExecuteUpdate(query.toString()); + } + StringBuffer query = new StringBuffer(); + query.append("CREATE TABLE "); + query.append(tmpTableName); + query.append("(LIKE "); + query.append(tableName); + query.append(" INCLUDING CONSTRAINTS)"); + if (conf.get("pgbulkload.staging.tablespace") != null) { + query.append("TABLESPACE "); + query.append(conf.get("pgbulkload.staging.tablespace")); + } + doExecuteUpdate(query.toString()); + conn.commit(); + } catch (ClassNotFoundException ex) { + LOG.error("Unable to load JDBC driver class", ex); + throw new IOException(ex); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to execute statement", ex); + throw new IOException(ex); + } finally { + try { + conn.close(); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to close connection", ex); + } + } + + try { + ArrayList<String> args = new ArrayList<String>(); + List<String> envp = Executor.getCurEnvpStrings(); + args.add(conf.get("pgbulkload.bin", "pg_bulkload")); + args.add("--username=" + + conf.get(DBConfiguration.USERNAME_PROPERTY)); + args.add("--dbname=" + + JdbcUrl.getDatabaseName(conf.get(DBConfiguration.URL_PROPERTY))); + args.add("--host=" + + JdbcUrl.getHostName(conf.get(DBConfiguration.URL_PROPERTY))); + args.add("--port=" + + JdbcUrl.getPort(conf.get(DBConfiguration.URL_PROPERTY))); + args.add("--input=stdin"); + args.add("--output=" + tmpTableName); + args.add("-o"); + args.add("TYPE=CSV"); + args.add("-o"); + args.add("DELIMITER=" + conf.get("pgbulkload.input.field.delim", ",")); + args.add("-o"); + args.add("QUOTE=" + conf.get("pgbulkload.input.enclosedby", "\"")); + args.add("-o"); + args.add("ESCAPE=" + conf.get("pgbulkload.input.escapedby", "\"")); + args.add("-o"); + args.add("CHECK_CONSTRAINTS=" + conf.get("pgbulkload.check.constraints")); + args.add("-o"); + args.add("PARSE_ERRORS=" + conf.get("pgbulkload.parse.errors")); + args.add("-o"); + args.add("DUPLICATE_ERRORS=" + conf.get("pgbulkload.duplicate.errors")); + if (conf.get("pgbulkload.null.string") != null) { + args.add("-o"); + args.add("NULL=" + conf.get("pgbulkload.null.string")); + } + if (conf.get("pgbulkload.filter") != null) { + args.add("-o"); + args.add("FILTER=" + conf.get("pgbulkload.filter")); + } + LOG.debug("Starting pg_bulkload with arguments:"); + for (String arg : args) { + LOG.debug(" " + arg); + } + if (conf.get(DBConfiguration.PASSWORD_PROPERTY) != null) { + String tmpDir = System.getProperty("test.build.data", "/tmp/"); + if (!tmpDir.endsWith(File.separator)) { + tmpDir = tmpDir + File.separator; + } + tmpDir = conf.get("job.local.dir", tmpDir); + passwordFilename = PostgreSQLUtils.writePasswordFile(tmpDir, + conf.get(DBConfiguration.PASSWORD_PROPERTY)); + envp.add("PGPASSFILE=" + passwordFilename); + } + process = Runtime.getRuntime().exec(args.toArray(new String[0]), + envp.toArray(new String[0])); + out = process.getOutputStream(); + writer = new BufferedWriter(new OutputStreamWriter(out)); + thread = new ReadThread(process.getErrorStream()); + thread.start(); + } catch (Exception e) { + LOG.error("Can't start up pg_bulkload process", e); + cleanup(context); + doExecuteUpdate("DROP TABLE " + tmpTableName); + throw new IOException(e); + } + } + + + public void map(LongWritable key, Writable value, Context context) + throws IOException, InterruptedException { + try { + String str = value.toString(); + if (value instanceof Text) { + writer.write(str, 0, str.length()); + writer.newLine(); + } else if (value instanceof SqoopRecord) { + writer.write(str, 0, str.length()); + } + } catch (Exception e) { + doExecuteUpdate("DROP TABLE " + tmpTableName); + cleanup(context); + throw new IOException(e); + } + } + + + protected void cleanup(Context context) + throws IOException, InterruptedException { + LongWritable taskid = + new LongWritable(context.getTaskAttemptID().getTaskID().getId()); + context.write(taskid, new Text(tmpTableName)); + + if (writer != null) { + writer.close(); + } + if (out != null) { + out.close(); + } + try { + if (thread != null) { + thread.join(); + } + } finally { + // block until the process is done. + if (null != process) { + while (true) { + try { + int returnValue = process.waitFor(); + + // Check pg_bulkload's process return value + if (returnValue != 0) { + throw new RuntimeException( + "Unexpected return value from pg_bulkload: "+ returnValue); + } + } catch (InterruptedException ie) { + // interrupted; loop around. + LOG.debug("Caught interrupted exception waiting for process " + + "pg_bulkload.bin to exit"); + //Clear the interrupted flag. We have to call Thread.interrupted + //to clear for interrupted exceptions from process.waitFor + //See http://bugs.sun.com/view_bug.do?bug_id=6420270 for more info + Thread.interrupted(); + continue; + } + break; + } + } + } + if (null != passwordFilename) { + if (!new File(passwordFilename).delete()) { + LOG.error("Could not remove postgresql password file " + + passwordFilename); + LOG.error("You should remove this file to protect your credentials."); + } + } + } + + + protected int doExecuteUpdate(String query) throws IOException { + Connection conn = null; + try { + conn = dbConf.getConnection(); + conn.setAutoCommit(false); + } catch (ClassNotFoundException ex) { + LOG.error("Unable to load JDBC driver class", ex); + throw new IOException(ex); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to connect to database", ex); + throw new IOException(ex); + } + Statement stmt = null; + try { + stmt = conn.createStatement(); + int ret = stmt.executeUpdate(query); + conn.commit(); + return ret; + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to execute query: " + query, ex); + throw new IOException(ex); + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to close statement", ex); + } + } + try { + conn.close(); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to close connection", ex); + } + } + } + + + private class ReadThread extends Thread { + private InputStream in; + + ReadThread(InputStream in) { + this.in = in; + } + + public void run() { + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + String line = null; + try { + while((line = reader.readLine()) != null) { + System.out.println(line); + } + reader.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportReducer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportReducer.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportReducer.java new file mode 100644 index 0000000..3dc05a7 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportReducer.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.postgresql; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.sqoop.mapreduce.AutoProgressReducer; +import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.apache.sqoop.util.LoggingUtils; + + +/** + * Reducer for transfering data from temporary table to destination. + * Reducer drops all temporary tables if all data successfully transfered. + * Temporary tables is not dropptd in error case for manual retry. + */ +public class PGBulkloadExportReducer + extends AutoProgressReducer<LongWritable, Text, + NullWritable, NullWritable> { + + public static final Log LOG = + LogFactory.getLog(PGBulkloadExportReducer.class.getName()); + private Configuration conf; + private DBConfiguration dbConf; + private Connection conn; + private String tableName; + + + protected void setup(Context context) + throws IOException, InterruptedException { + conf = context.getConfiguration(); + dbConf = new DBConfiguration(conf); + tableName = dbConf.getOutputTableName(); + try { + conn = dbConf.getConnection(); + conn.setAutoCommit(false); + } catch (ClassNotFoundException ex) { + LOG.error("Unable to load JDBC driver class", ex); + throw new IOException(ex); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to connect to database", ex); + throw new IOException(ex); + } + } + + + @Override + public void reduce(LongWritable key, Iterable<Text> values, Context context) + throws IOException, InterruptedException { + Statement stmt = null; + try { + stmt = conn.createStatement(); + for (Text value : values) { + int inserted = stmt.executeUpdate("INSERT INTO " + tableName + + " ( SELECT * FROM " + value + " )"); + stmt.executeUpdate("DROP TABLE " + value); + } + conn.commit(); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to execute create query.", ex); + throw new IOException(ex); + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to close statement", ex); + } + } + } + } + + + protected void cleanup(Context context) + throws IOException, InterruptedException { + try { + conn.close(); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to load JDBC driver class", ex); + throw new IOException(ex); + } + } + +}
