Updated Branches: refs/heads/trunk a2a02076a -> fb29b8f9f
SQOOP-999: Support bulk load from HDFS to PostgreSQL using COPY ... FROM (Masatake Iwasaki via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/fb29b8f9 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/fb29b8f9 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/fb29b8f9 Branch: refs/heads/trunk Commit: fb29b8f9fcd45c98857fe44cfc3fe294f2fc6f84 Parents: a2a0207 Author: Jarek Jarcec Cecho <[email protected]> Authored: Wed Jul 3 18:38:14 2013 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Wed Jul 3 18:38:14 2013 -0700 ---------------------------------------------------------------------- ivy.xml | 3 + ivy/libraries.properties | 1 + .../sqoop/manager/DirectPostgresqlManager.java | 21 ++- .../postgresql/PostgreSQLCopyExportJob.java | 110 +++++++++++++ .../postgresql/PostgreSQLCopyExportMapper.java | 160 +++++++++++++++++++ .../DirectPostgreSQLExportManualTest.java | 160 +++++++++++++++++++ 6 files changed, 453 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/fb29b8f9/ivy.xml ---------------------------------------------------------------------- diff --git a/ivy.xml b/ivy.xml index 750adfc..63fdc80 100644 --- a/ivy.xml +++ b/ivy.xml @@ -182,6 +182,9 @@ under the License. <artifact name="hcatalog-core" type="jar"/> </dependency> + <dependency org="org.postgresql" name="postgresql" + rev="${postgresql.version}" conf="common->default" /> + <exclude org="org.apache.hadoop" module="avro"/> <exclude org="commons-daemon" module="commons-daemon" /> <exclude type="pom" /> http://git-wip-us.apache.org/repos/asf/sqoop/blob/fb29b8f9/ivy/libraries.properties ---------------------------------------------------------------------- diff --git a/ivy/libraries.properties b/ivy/libraries.properties index 430d554..df1a08f 100644 --- a/ivy/libraries.properties +++ b/ivy/libraries.properties @@ -42,3 +42,4 @@ mvn.version=2.0.10 rats-lib.version=0.5.1 +postgresql.version=9.2-1003-jdbc4 http://git-wip-us.apache.org/repos/asf/sqoop/blob/fb29b8f9/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java index c085218..8d4a097 100644 --- a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java +++ b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java @@ -35,21 +35,27 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.sqoop.cli.RelatedOptions; +import org.apache.sqoop.mapreduce.ExportInputFormat; +import org.apache.sqoop.mapreduce.postgresql.PostgreSQLCopyExportJob; import org.apache.sqoop.util.PostgreSQLUtils; +import org.apache.sqoop.util.SubstitutionUtils; import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.io.SplittableBufferedWriter; +import com.cloudera.sqoop.manager.ExportJobContext; import com.cloudera.sqoop.util.AsyncSink; import com.cloudera.sqoop.util.DirectImportUtils; import com.cloudera.sqoop.util.ErrorableAsyncSink; import com.cloudera.sqoop.util.ErrorableThread; +import com.cloudera.sqoop.util.ExportException; import com.cloudera.sqoop.util.Executor; import com.cloudera.sqoop.util.ImportException; import com.cloudera.sqoop.util.JdbcUrl; import com.cloudera.sqoop.util.LoggingAsyncSink; import com.cloudera.sqoop.util.PerfCounters; -import org.apache.sqoop.util.SubstitutionUtils; + /** * Manages direct dumps from Postgresql databases via psql COPY TO STDOUT @@ -532,7 +538,7 @@ public class DirectPostgresqlManager @Override public boolean supportsStagingForExport() { - return false; + return true; } // CHECKSTYLE:ON @@ -569,4 +575,15 @@ public class DirectPostgresqlManager return extraOptions; } + + public void exportTable(ExportJobContext context) + throws IOException, ExportException { + context.setConnManager(this); + PostgreSQLCopyExportJob job = + new PostgreSQLCopyExportJob(context, + null, + ExportInputFormat.class, + NullOutputFormat.class); + job.runExport(); + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/fb29b8f9/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java new file mode 100644 index 0000000..483949f --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.postgresql; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.config.ConfigurationHelper; +import com.cloudera.sqoop.manager.ExportJobContext; +import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.sqoop.lib.DelimiterSet; +import org.apache.sqoop.mapreduce.JdbcExportJob; + + + +/** + * Run an export using PostgreSQL JDBC Copy API. + */ +public class PostgreSQLCopyExportJob extends JdbcExportJob { + public static final Log LOG = + LogFactory.getLog(PostgreSQLCopyExportJob.class.getName()); + + public PostgreSQLCopyExportJob(final ExportJobContext context) { + super(context); + } + + public PostgreSQLCopyExportJob(final ExportJobContext ctxt, + final Class<? extends Mapper> mapperClass, + final Class<? extends InputFormat> inputFormatClass, + final Class<? extends OutputFormat> outputFormatClass) { + super(ctxt, mapperClass, inputFormatClass, outputFormatClass); + } + + @Override + protected Class<? extends Mapper> getMapperClass() { + return PostgreSQLCopyExportMapper.class; + } + + @Override + protected void configureMapper(Job job, String tableName, + String tableClassName) throws ClassNotFoundException, IOException { + if (isHCatJob) { + throw new IOException("Sqoop-HCatalog Integration is not supported."); + } + switch (getInputFileType()) { + case AVRO_DATA_FILE: + throw new IOException("Avro data file is not supported."); + case SEQUENCE_FILE: + case UNKNOWN: + default: + job.setMapperClass(getMapperClass()); + } + + // Concurrent writes of the same records would be problematic. + ConfigurationHelper.setJobMapSpeculativeExecution(job, false); + job.setMapOutputKeyClass(NullWritable.class); + job.setMapOutputValueClass(NullWritable.class); + } + + protected void propagateOptionsToJob(Job job) { + super.propagateOptionsToJob(job); + SqoopOptions opts = context.getOptions(); + Configuration conf = job.getConfiguration(); + if (opts.getNullStringValue() != null) { + conf.set("postgresql.null.string", opts.getNullStringValue()); + } + setDelimiter("postgresql.input.field.delim", + opts.getInputFieldDelim(), conf); + setDelimiter("postgresql.input.record.delim", + opts.getInputRecordDelim(), conf); + setDelimiter("postgresql.input.enclosedby", + opts.getInputEnclosedBy(), conf); + setDelimiter("postgresql.input.escapedby", + opts.getInputEscapedBy(), conf); + conf.setBoolean("postgresql.input.encloserequired", + opts.isInputEncloseRequired()); + } + + private void setDelimiter(String prop, char val, Configuration conf) { + switch (val) { + case DelimiterSet.NULL_CHAR: + break; + case '\t': + default: + conf.set(prop, String.valueOf(val)); + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/fb29b8f9/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java new file mode 100644 index 0000000..d10cadb --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.postgresql; + +import com.cloudera.sqoop.lib.DelimiterSet; +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.sqoop.mapreduce.AutoProgressMapper; +import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.apache.sqoop.util.LoggingUtils; +import org.postgresql.PGConnection; +import org.postgresql.copy.CopyManager; +import org.postgresql.copy.CopyIn; + + +/** + * Mapper that export rows from HDFS to a PostgreSQL database at high speed + * with PostgreSQL Copy API. + * + * map() methods read from SequenceFiles (containing existing SqoopRecords) + * or text files (containing delimited lines) + * and deliver these results to the CopyIn object of PostgreSQL JDBC. + */ +public class PostgreSQLCopyExportMapper + extends AutoProgressMapper<LongWritable, Writable, + NullWritable, NullWritable> { + public static final Log LOG = + LogFactory.getLog(PostgreSQLCopyExportMapper.class.getName()); + + private Configuration conf; + private DBConfiguration dbConf; + private Connection conn = null; + private CopyIn copyin = null; + private StringBuilder line = new StringBuilder(); + private DelimiterSet delimiters = + new DelimiterSet(',', '\n', + DelimiterSet.NULL_CHAR, DelimiterSet.NULL_CHAR, false); + + public PostgreSQLCopyExportMapper() { + } + + @Override + protected void setup(Context context) + throws IOException, InterruptedException { + + super.setup(context); + conf = context.getConfiguration(); + dbConf = new DBConfiguration(conf); + CopyManager cm = null; + try { + conn = dbConf.getConnection(); + cm = ((PGConnection)conn).getCopyAPI(); + } catch (ClassNotFoundException ex) { + LOG.error("Unable to load JDBC driver class", ex); + throw new IOException(ex); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to get CopyIn", ex); + throw new IOException(ex); + } + try { + StringBuilder sql = new StringBuilder(); + sql.append("COPY "); + sql.append(dbConf.getOutputTableName()); + sql.append(" FROM STDIN WITH ("); + sql.append(" ENCODING 'UTF-8' "); + sql.append(", FORMAT csv "); + sql.append(", DELIMITER "); + sql.append("'"); + sql.append(conf.get("postgresql.input.field.delim", ",")); + sql.append("'"); + sql.append(", QUOTE "); + sql.append("'"); + sql.append(conf.get("postgresql.input.enclosedby", "\"")); + sql.append("'"); + sql.append(", ESCAPE "); + sql.append("'"); + sql.append(conf.get("postgresql.input.escapedby", "\"")); + sql.append("'"); + if (conf.get("postgresql.null.string") != null) { + sql.append(", NULL "); + sql.append("'"); + sql.append(conf.get("postgresql.null.string")); + sql.append("'"); + } + sql.append(")"); + LOG.debug("Starting export with copy: " + sql); + copyin = cm.copyIn(sql.toString()); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to get CopyIn", ex); + close(); + throw new IOException(ex); + } + } + + @Override + public void map(LongWritable key, Writable value, Context context) + throws IOException, InterruptedException { + line.setLength(0); + line.append(value.toString()); + if (value instanceof Text) { + line.append(System.getProperty("line.separator")); + } + try { + byte[]data = line.toString().getBytes("UTF-8"); + copyin.writeToCopy(data, 0, data.length); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to execute copy", ex); + close(); + throw new IOException(ex); + } + } + + @Override + protected void cleanup(Context context) + throws IOException, InterruptedException { + try { + copyin.endCopy(); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to finalize copy", ex); + throw new IOException(ex); + } + close(); + } + + void close() throws IOException { + if (conn != null) { + try { + conn.close(); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to close connection", ex); + throw new IOException(ex); + } + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/fb29b8f9/src/test/com/cloudera/sqoop/manager/DirectPostgreSQLExportManualTest.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/manager/DirectPostgreSQLExportManualTest.java b/src/test/com/cloudera/sqoop/manager/DirectPostgreSQLExportManualTest.java new file mode 100644 index 0000000..52095ef --- /dev/null +++ b/src/test/com/cloudera/sqoop/manager/DirectPostgreSQLExportManualTest.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.manager; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.PreparedStatement; +import java.util.Arrays; +import java.util.ArrayList; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.JobConf; +import com.cloudera.sqoop.TestExport; +import com.cloudera.sqoop.mapreduce.db.DBConfiguration; + + +/** + * Test the DirectPostgresqlManager implementations. + * DirectPostgresqlManager uses JDBC driver to facilitate it. + * + * Since this requires a Postgresql installation on your local machine to use, + * this class is named in such a way that Hadoop's default QA process does not + * run it. + * + * You need to run this manually with + * -Dtestcase=DirectPostgreSQLExportManualTest. + * + * You need to put Postgresql's JDBC driver library into lib dir. + * + * You need to create a sqooptest superuser and database and tablespace, + * + * $ sudo -u postgres createuser -U postgres -s sqooptest + * $ sudo -u postgres createdb -U sqooptest sqooptest + * $ psql -U sqooptest sqooptest + * + */ +public class DirectPostgreSQLExportManualTest extends TestExport { + + public static final Log LOG = + LogFactory.getLog(DirectPostgreSQLExportManualTest.class.getName()); + private DBConfiguration dbConf; + + static final String HOST_URL = + System.getProperty("sqoop.test.postgresql.connectstring.host_url", + "jdbc:postgresql://localhost/"); + static final String DATABASE = + System.getProperty("sqoop.test.postgresql.database", "sqooptest"); + static final String USERNAME = + System.getProperty("sqoop.test.postgresql.username", "sqooptest"); + static final String CONNECT_STRING = HOST_URL + DATABASE; + + public DirectPostgreSQLExportManualTest() { + JobConf conf = new JobConf(getConf()); + DBConfiguration.configureDB(conf, + "org.postgresql.Driver", + getConnectString(), + getUserName(), + (String) null, (Integer) null); + dbConf = new DBConfiguration(conf); + } + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + @Override + protected String getConnectString() { + return CONNECT_STRING; + } + + protected String getUserName() { + return USERNAME; + } + + @Override + protected String getTablePrefix() { + return super.getTablePrefix().toLowerCase(); + } + + @Override + protected String getTableName() { + return super.getTableName().toLowerCase(); + } + + @Override + public String getStagingTableName() { + return super.getStagingTableName().toLowerCase(); + } + + @Override + protected Connection getConnection() { + try { + Connection conn = dbConf.getConnection(); + conn.setAutoCommit(false); + PreparedStatement stmt = + conn.prepareStatement("SET extra_float_digits TO 0"); + stmt.executeUpdate(); + conn.commit(); + return conn; + } catch (SQLException sqlE) { + LOG.error("Could not get connection to test server: " + sqlE); + return null; + } catch (ClassNotFoundException cnfE) { + LOG.error("Could not find driver class: " + cnfE); + return null; + } + } + + @Override + protected String getDropTableStatement(String tableName) { + return "DROP TABLE IF EXISTS " + tableName; + } + + @Override + protected String[] getArgv(boolean includeHadoopFlags, + int rowsPerStatement, + int statementsPerTx, + String... additionalArgv) { + ArrayList<String> args = + new ArrayList<String>(Arrays.asList(additionalArgv)); + args.add("--username"); + args.add(getUserName()); + args.add("--direct"); + return super.getArgv(includeHadoopFlags, + rowsPerStatement, + statementsPerTx, + args.toArray(new String[0])); + } + + @Override + protected String [] getCodeGenArgv(String... extraArgs) { + ArrayList<String> args = new ArrayList<String>(Arrays.asList(extraArgs)); + args.add("--username"); + args.add(getUserName()); + return super.getCodeGenArgv(args.toArray(new String[0])); + } + + @Override + public void testColumnsExport() throws IOException, SQLException { + // Direct export does not support --columns option. + } +}
