Updated Branches: refs/heads/trunk 482f39253 -> 20af67ef6
SQOOP-749: Exports Using Stored Procedures (Functions) (Nick White via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/20af67ef Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/20af67ef Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/20af67ef Branch: refs/heads/trunk Commit: 20af67ef60096b17e1d9585670e5ec787eb760e2 Parents: 482f392 Author: Jarek Jarcec Cecho <[email protected]> Authored: Tue Jan 22 12:24:40 2013 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Tue Jan 22 12:24:40 2013 -0800 ---------------------------------------------------------------------- ivy.xml | 5 + ivy/libraries.properties | 2 + src/docs/user/export-purpose.txt | 5 +- src/docs/user/export.txt | 20 +- src/java/org/apache/sqoop/SqoopOptions.java | 9 + src/java/org/apache/sqoop/manager/ConnManager.java | 31 ++ src/java/org/apache/sqoop/manager/SqlManager.java | 90 ++++ .../sqoop/mapreduce/ExportCallOutputFormat.java | 154 +++++++ .../apache/sqoop/mapreduce/JdbcCallExportJob.java | 104 +++++ .../org/apache/sqoop/mapreduce/JdbcExportJob.java | 11 +- src/java/org/apache/sqoop/orm/ClassWriter.java | 10 +- src/java/org/apache/sqoop/tool/BaseSqoopTool.java | 1 + src/java/org/apache/sqoop/tool/ExportTool.java | 37 ++- src/test/com/cloudera/sqoop/SmokeTests.java | 3 + src/test/com/cloudera/sqoop/TestConnFactory.java | 10 + src/test/com/cloudera/sqoop/TestExport.java | 13 +- .../sqoop/manager/PostgresqlExportTest.java | 136 +++++- .../cloudera/sqoop/testutil/ExportJobTestCase.java | 10 +- .../org/apache/sqoop/TestExportUsingProcedure.java | 326 +++++++++++++++ 19 files changed, 940 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/ivy.xml ---------------------------------------------------------------------- diff --git a/ivy.xml b/ivy.xml index 1ee60df..1fa4dd1 100644 --- a/ivy.xml +++ b/ivy.xml @@ -124,6 +124,11 @@ under the License. conf="common->master" /> <dependency org="junit" name="junit" rev="${junit.version}" conf="test->default"/> + <!-- We're only using H2 for tests as it supports stored + procedures; once we move to HSQLDB 2.x we can drop + this --> + <dependency org="com.h2database" name="h2" rev="${h2.version}" + conf="test->default"/> <dependency org="hsqldb" name="hsqldb" rev="${hsqldb.version}" conf="common->default;redist->default"/> <dependency org="commons-io" name="commons-io" rev="${commons-io.version}" http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/ivy/libraries.properties ---------------------------------------------------------------------- diff --git a/ivy/libraries.properties b/ivy/libraries.properties index 4c9e37d..430d554 100644 --- a/ivy/libraries.properties +++ b/ivy/libraries.properties @@ -34,6 +34,8 @@ ivy.version=2.1.0 junit.version=4.5 +h2.version=1.3.170 + log4j.version=1.2.16 mvn.version=2.0.10 http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/docs/user/export-purpose.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/export-purpose.txt b/src/docs/user/export-purpose.txt index c26eaa7..def6ead 100644 --- a/src/docs/user/export-purpose.txt +++ b/src/docs/user/export-purpose.txt @@ -25,6 +25,5 @@ user-specified delimiters. The default operation is to transform these into a set of +INSERT+ statements that inject the records into the database. In "update mode," Sqoop will generate +UPDATE+ statements that replace existing records -in the database. - - +in the database, and in "call mode" Sqoop will make a stored procedure +call for each record. http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/docs/user/export.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/export.txt b/src/docs/user/export.txt index 9f600fe..8b9e473 100644 --- a/src/docs/user/export.txt +++ b/src/docs/user/export.txt @@ -54,6 +54,7 @@ Argument Description +-m,\--num-mappers <n>+ Use 'n' map tasks to export in\ parallel +\--table <table-name>+ Table to populate ++\--call <stored-proc-name>+ Stored Procedure to call +\--update-key <col-name>+ Anchor column to use for updates.\ Use a comma separated list of columns\ if there are more than one column. @@ -76,9 +77,10 @@ Argument Description statement execution. ------------------------------------------------------------------------ -The +\--table+ and +\--export-dir+ arguments are required. These -specify the table to populate in the database, and the -directory in HDFS that contains the source data. +The +\--export-dir+ argument and one of +\--table+ or +\--call+ are + required. These specify the table to populate in the database (or the + stored procedure to call), and the directory in HDFS that contains + the source data. You can control the number of mappers independently from the number of files present in the directory. Export performance depends on the @@ -126,7 +128,8 @@ specified, Sqoop will delete all of the data before starting the export job. NOTE: Support for staging data prior to pushing it into the destination table is not available for +--direct+ exports. It is also not available when -export is invoked using the +--update-key+ option for updating existing data. +export is invoked using the +--update-key+ option for updating existing data, +and when stored procedures are used to insert the data. Inserts vs. Updates @@ -275,3 +278,12 @@ Another basic export to populate a table named +bar+ with validation enabled: $ sqoop export --connect jdbc:mysql://db.example.com/foo --table bar \ --export-dir /results/bar_data --validate ---- + +An export that calls a stored procedure named +barproc+ for every record in ++/results/bar_data+ would look like: + +---- +$ sqoop export --connect jdbc:mysql://db.example.com/foo --call barproc \ + --export-dir /results/bar_data +---- + http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/java/org/apache/sqoop/SqoopOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index b0fdfa0..addc889 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -99,6 +99,7 @@ public class SqoopOptions implements Cloneable { @StoredAsProperty("db.username") private String username; @StoredAsProperty("db.export.staging.table") private String stagingTableName; @StoredAsProperty("db.clear.staging.table") private boolean clearStagingTable; + @StoredAsProperty("db.export.call") private String call; private Properties connectionParams; //Properties stored as db.connect.params @@ -2012,4 +2013,12 @@ public class SqoopOptions implements Cloneable { Class validationFailureHandlerClazz) { this.validationFailureHandlerClass = validationFailureHandlerClazz; } + + public String getCall() { + return call; + } + + public void setCall(String theCall) { + this.call = theCall; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/java/org/apache/sqoop/manager/ConnManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java index 358981e..1b32dc9 100644 --- a/src/java/org/apache/sqoop/manager/ConnManager.java +++ b/src/java/org/apache/sqoop/manager/ConnManager.java @@ -70,6 +70,15 @@ public abstract class ConnManager { public abstract String [] getColumnNames(String tableName); /** + * Return a list of stored procedure argument names in the order + * that they are declared. + */ + public String [] getColumnNamesForProcedure(String procedureName) { + throw new UnsupportedOperationException( + "No stored procedure support for this database"); + } + + /** * Return a list of column names in query in the order returned by the db. */ public String [] getColumnNamesForQuery(String query) { @@ -247,6 +256,18 @@ public abstract class ConnManager { /** * Return an unordered mapping from colname to sqltype for + * all the input arguments for a stored procedure. + * + * The Integer type id is a constant from java.sql.Types + */ + public Map<String, Integer> getColumnTypesForProcedure( + String procedureName) { + throw new UnsupportedOperationException( + "No stored procedure support for this database"); + } + + /** + * Return an unordered mapping from colname to sqltype for * all columns in a table or query. * * The Integer type id is a constant from java.sql.Types @@ -475,6 +496,16 @@ public abstract class ConnManager { } /** + * Export data stored in HDFS into a table in a database. This calls a stored + * procedure to insert rows into the target table. + */ + public void callTable(com.cloudera.sqoop.manager.ExportJobContext context) + throws IOException, ExportException { + throw new ExportException("This database does not support exports " + + "using stored procedures"); + } + + /** * Export updated data stored in HDFS into a database table. * This updates existing rows in the target table, based on the * updateKeyCol specified in the context's SqoopOptions. http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/java/org/apache/sqoop/manager/SqlManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/SqlManager.java b/src/java/org/apache/sqoop/manager/SqlManager.java index 3a52c6d..03c9e64 100644 --- a/src/java/org/apache/sqoop/manager/SqlManager.java +++ b/src/java/org/apache/sqoop/manager/SqlManager.java @@ -45,12 +45,15 @@ import java.sql.SQLException; import java.sql.Types; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.StringUtils; +import org.apache.sqoop.mapreduce.JdbcCallExportJob; import org.apache.sqoop.util.SqlTypeMap; /** @@ -161,6 +164,48 @@ public abstract class SqlManager } } + @Override + public String[] getColumnNamesForProcedure(String procedureName) { + List<String> ret = new ArrayList<String>(); + try { + DatabaseMetaData metaData = this.getConnection().getMetaData(); + ResultSet results = metaData.getProcedureColumns(null, null, + procedureName, null); + if (null == results) { + return null; + } + + try { + while (results.next()) { + if (results.getInt("COLUMN_TYPE") + != DatabaseMetaData.procedureColumnReturn) { + int index = results.getInt("ORDINAL_POSITION") - 1; + if (index < 0) { + continue; // actually the return type + } + for(int i = ret.size(); i < index; ++i) { + ret.add(null); + } + String name = results.getString("COLUMN_NAME"); + if (index == ret.size()) { + ret.add(name); + } else { + ret.set(index, name); + } + } + } + return ret.toArray(new String[ret.size()]); + } finally { + results.close(); + getConnection().commit(); + } + } catch (SQLException sqlException) { + LOG.error("Error reading procedure metadata: " + + sqlException.toString()); + return null; + } + } + /** * @return the SQL query to use in getColumnTypes() in case this logic must * be tuned per-database, but the main extraction loop is still inheritable. @@ -322,6 +367,42 @@ public abstract class SqlManager } @Override + public Map<String, Integer> getColumnTypesForProcedure(String procedureName) { + Map<String, Integer> ret = new TreeMap<String, Integer>(); + try { + DatabaseMetaData metaData = this.getConnection().getMetaData(); + ResultSet results = metaData.getProcedureColumns(null, null, + procedureName, null); + if (null == results) { + return null; + } + + try { + while (results.next()) { + if (results.getInt("COLUMN_TYPE") + != DatabaseMetaData.procedureColumnReturn + && results.getInt("ORDINAL_POSITION") > 0) { + // we don't care if we get several rows for the + // same ORDINAL_POSITION (e.g. like H2 gives us) + // as we'll just overwrite the entry in the map: + ret.put( + results.getString("COLUMN_NAME"), + results.getInt("DATA_TYPE")); + } + } + return ret.isEmpty() ? null : ret; + } finally { + results.close(); + getConnection().commit(); + } + } catch (SQLException sqlException) { + LOG.error("Error reading primary key metadata: " + + sqlException.toString()); + return null; + } + } + + @Override public String[] listTables() { ResultSet results = null; String [] tableTypes = {"TABLE"}; @@ -692,6 +773,15 @@ public abstract class SqlManager exportJob.runExport(); } + @Override + public void callTable(com.cloudera.sqoop.manager.ExportJobContext context) + throws IOException, + ExportException { + context.setConnManager(this); + JdbcCallExportJob exportJob = new JdbcCallExportJob(context); + exportJob.runExport(); + } + public void release() { if (null != this.lastStatement) { try { http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/java/org/apache/sqoop/mapreduce/ExportCallOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/ExportCallOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/ExportCallOutputFormat.java new file mode 100644 index 0000000..7dc3453 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/ExportCallOutputFormat.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.sqoop.mapreduce.db.DBConfiguration; + +import com.cloudera.sqoop.lib.SqoopRecord; + +/** + * Insert the emitted keys as records into a database table. + * This supports a configurable "spill threshold" at which + * point intermediate transactions are committed. + * + * Record objects are buffered before actually performing the INSERT + * statements; this requires that the key implement the + * SqoopRecord interface. + * + * Uses DBOutputFormat/DBConfiguration for configuring the output. + */ +public class ExportCallOutputFormat<K extends SqoopRecord, V> + extends AsyncSqlOutputFormat<K, V> { + + private static final Log LOG = LogFactory.getLog( + ExportCallOutputFormat.class); + + @Override + /** {@inheritDoc} */ + public void checkOutputSpecs(JobContext context) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + DBConfiguration dbConf = new DBConfiguration(conf); + + // Sanity check all the configuration values we need. + if (null == conf.get(DBConfiguration.URL_PROPERTY)) { + throw new IOException("Database connection URL is not set."); + } else if (null == dbConf.getOutputTableName()) { + throw new IOException("Procedure name is not set for export"); + } else if (null == dbConf.getOutputFieldNames() + && 0 == dbConf.getOutputFieldCount()) { + throw new IOException( + "Output field names are null and zero output field count set."); + } + } + + @Override + /** {@inheritDoc} */ + public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) + throws IOException { + try { + return new ExportCallRecordWriter(context); + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * RecordWriter to write the output to a row in a database table. + * The actual database updates are executed in a second thread. + */ + public class ExportCallRecordWriter extends AsyncSqlRecordWriter<K, V> { + + protected String procedureName; + protected String [] columnNames; // The columns to insert into. + protected int columnCount; // If columnNames is null, tells ## of cols. + + public ExportCallRecordWriter(TaskAttemptContext context) + throws ClassNotFoundException, SQLException { + super(context); + + Configuration conf = getConf(); + + DBConfiguration dbConf = new DBConfiguration(conf); + procedureName = dbConf.getOutputTableName(); + columnNames = dbConf.getOutputFieldNames(); + columnCount = dbConf.getOutputFieldCount(); + } + + @Override + /** {@inheritDoc} */ + protected PreparedStatement getPreparedStatement( + List<SqoopRecord> userRecords) throws SQLException { + + PreparedStatement stmt = null; + + // Synchronize on connection to ensure this does not conflict + // with the operations in the update thread. + Connection conn = getConnection(); + synchronized (conn) { + stmt = conn.prepareCall(getCallStatement(userRecords.size())); + } + + for (SqoopRecord record : userRecords) { + record.write(stmt, 0); + stmt.addBatch(); + } + + return stmt; + } + + @Override + protected boolean isBatchExec() { + return true; + } + + /** + * @return an INSERT statement suitable for inserting 'numRows' rows. + */ + protected String getCallStatement(int numRows) { + StringBuilder sb = new StringBuilder(); + + sb.append("{call " + procedureName + " ("); + + int numSlots = columnNames == null ? columnCount : columnNames.length; + if (numSlots > 0) { + sb.append("?"); + } + for(int i = 1; i < numSlots; ++i) { + sb.append(", ?"); + } + + sb.append(")}"); + + return sb.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java new file mode 100644 index 0000000..2459698 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.apache.sqoop.mapreduce.db.DBOutputFormat; + +import com.cloudera.sqoop.manager.ConnManager; +import com.cloudera.sqoop.manager.ExportJobContext; +import com.google.common.base.Strings; + +/** + * Run an export using JDBC (JDBC-based ExportCallOutputFormat) to + * call the stored procedure. + */ +public class JdbcCallExportJob extends JdbcExportJob { + public static final String SQOOP_EXPORT_CALL_KEY = "sqoop.export.call"; + + public static final Log LOG = LogFactory.getLog( + JdbcCallExportJob.class.getName()); + + public JdbcCallExportJob(final ExportJobContext context) { + super(context, null, null, ExportCallOutputFormat.class); + } + + public JdbcCallExportJob(final ExportJobContext ctxt, + final Class<? extends Mapper> mapperClass, + final Class<? extends InputFormat> inputFormatClass, + final Class<? extends OutputFormat> outputFormatClass) { + super(ctxt, mapperClass, inputFormatClass, outputFormatClass); + } + + /** + * makes sure the job knows what stored procedure to call. + */ + @Override + protected void propagateOptionsToJob(Job job) { + super.propagateOptionsToJob(job); + job.getConfiguration().set(SQOOP_EXPORT_CALL_KEY, options.getCall()); + } + + @Override + protected void configureOutputFormat(Job job, String tableName, + String tableClassName) throws IOException { + String procedureName = job.getConfiguration().get(SQOOP_EXPORT_CALL_KEY); + + ConnManager mgr = context.getConnManager(); + try { + if (Strings.isNullOrEmpty(options.getUsername())) { + DBConfiguration.configureDB(job.getConfiguration(), + mgr.getDriverClass(), + options.getConnectString(), + options.getConnectionParams()); + } else { + DBConfiguration.configureDB(job.getConfiguration(), + mgr.getDriverClass(), + options.getConnectString(), + options.getUsername(), + options.getPassword(), + options.getConnectionParams()); + } + + String [] colNames = options.getColumns(); + if (null == colNames) { + colNames = mgr.getColumnNamesForProcedure(procedureName); + } + DBOutputFormat.setOutput( + job, + mgr.escapeTableName(procedureName), + colNames); + + job.setOutputFormatClass(getOutputFormatClass()); + job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName); + } catch (ClassNotFoundException cnfe) { + throw new IOException("Could not load OutputFormat", cnfe); + } + } + +} + http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java index 00bd910..20636a0 100644 --- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java @@ -68,8 +68,15 @@ public class JdbcExportJob extends ExportJobBase { if (fileType == FileType.AVRO_DATA_FILE) { LOG.debug("Configuring for Avro export"); ConnManager connManager = context.getConnManager(); - Map<String, Integer> columnTypeInts = - connManager.getColumnTypes(tableName, options.getSqlQuery()); + Map<String, Integer> columnTypeInts; + if (options.getCall() == null) { + columnTypeInts = connManager.getColumnTypes( + tableName, + options.getSqlQuery()); + } else { + columnTypeInts = connManager.getColumnTypesForProcedure( + options.getCall()); + } MapWritable columnTypes = new MapWritable(); for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) { Text columnName = new Text(e.getKey()); http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/java/org/apache/sqoop/orm/ClassWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/orm/ClassWriter.java b/src/java/org/apache/sqoop/orm/ClassWriter.java index 47e1221..126b406 100644 --- a/src/java/org/apache/sqoop/orm/ClassWriter.java +++ b/src/java/org/apache/sqoop/orm/ClassWriter.java @@ -1209,6 +1209,10 @@ public class ClassWriter { if (null != tableName) { // Table-based import. Read column names from table. colNames = connManager.getColumnNames(tableName); + } else if (options.getCall() != null) { + // Read procedure arguments from metadata + colNames = connManager.getColumnNamesForProcedure( + this.options.getCall()); } else { // Infer/assign column names for arbitrary query. colNames = connManager.getColumnNamesForQuery( @@ -1236,7 +1240,11 @@ public class ClassWriter { } protected Map<String, Integer> getColumnTypes() throws IOException { - return connManager.getColumnTypes(tableName, options.getSqlQuery()); + if (options.getCall() == null) { + return connManager.getColumnTypes(tableName, options.getSqlQuery()); + } else { + return connManager.getColumnTypesForProcedure(options.getCall()); + } } /** http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/java/org/apache/sqoop/tool/BaseSqoopTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java index b4b2213..684d4a5 100644 --- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java +++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java @@ -142,6 +142,7 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool { public static final String HELP_ARG = "help"; public static final String UPDATE_KEY_ARG = "update-key"; public static final String UPDATE_MODE_ARG = "update-mode"; + public static final String CALL_ARG = "call"; // Arguments for validation. public static final String VALIDATE_ARG = "validate"; http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/java/org/apache/sqoop/tool/ExportTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/tool/ExportTool.java b/src/java/org/apache/sqoop/tool/ExportTool.java index acd296d..215addd 100644 --- a/src/java/org/apache/sqoop/tool/ExportTool.java +++ b/src/java/org/apache/sqoop/tool/ExportTool.java @@ -73,6 +73,9 @@ public class ExportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { // Mixed update/insert export manager.upsertTable(context); } + } else if (options.getCall() != null) { + // Stored procedure-based export. + manager.callTable(context); } else { // INSERT-based export. manager.exportTable(context); @@ -176,6 +179,12 @@ public class ExportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { + "new rows are found with non-matching keys in database") .withLongOpt(UPDATE_MODE_ARG) .create()); + exportOpts.addOption(OptionBuilder + .hasArg() + .withDescription("Populate the table using this stored " + + "procedure (one call per row)") + .withLongOpt(CALL_ARG) + .create()); addValidationOpts(exportOpts); @@ -273,6 +282,10 @@ public class ExportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { out.setClearStagingTable(true); } + if (in.hasOption(CALL_ARG)) { + out.setCall(in.getOptionValue(CALL_ARG)); + } + applyValidationOptions(in, out); applyNewUpdateOptions(in, out); applyInputFormatOptions(in, out); @@ -290,8 +303,9 @@ public class ExportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { */ protected void validateExportOptions(SqoopOptions options) throws InvalidOptionsException { - if (options.getTableName() == null) { - throw new InvalidOptionsException("Export requires a --table argument." + if (options.getTableName() == null && options.getCall() == null) { + throw new InvalidOptionsException( + "Export requires a --table or a --call argument." + HELP_STR); } else if (options.getExportDir() == null) { throw new InvalidOptionsException( @@ -322,8 +336,25 @@ public class ExportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { } else if (options.doClearStagingTable() && options.getStagingTableName() == null) { // Option to clear staging table specified but not the staging table name - throw new InvalidOptionsException("Option to clear the staging table is " + throw new InvalidOptionsException( + "Option to clear the staging table is " + "specified but the staging table name is not."); + } else if (options.getCall() != null + && options.getStagingTableName() != null) { + // using a stored procedure to insert rows is incompatible with using + // a staging table (as we don't know where the procedure will put the + // data, or what transactions it'll perform) + throw new InvalidOptionsException( + "Option the use a staging table is " + + "specified as well as a call option."); + } else if (options.getCall() != null && options.getUpdateKeyCol() != null) { + throw new InvalidOptionsException( + "Option to call a stored procedure" + + "can't be used in update mode."); + } else if (options.getCall() != null && options.getTableName() != null) { + // we don't know if the stored procedure will insert rows into + // a given table + throw new InvalidOptionsException("Can't specify --call and --table."); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/test/com/cloudera/sqoop/SmokeTests.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/SmokeTests.java b/src/test/com/cloudera/sqoop/SmokeTests.java index 76df6cf..c5dc860 100644 --- a/src/test/com/cloudera/sqoop/SmokeTests.java +++ b/src/test/com/cloudera/sqoop/SmokeTests.java @@ -18,6 +18,8 @@ package com.cloudera.sqoop; +import org.apache.sqoop.TestExportUsingProcedure; + import com.cloudera.sqoop.hive.TestHiveImport; import com.cloudera.sqoop.hive.TestTableDefWriter; import com.cloudera.sqoop.io.TestLobFile; @@ -82,6 +84,7 @@ public final class SmokeTests { suite.addTestSuite(TestBooleanParser.class); suite.addTestSuite(TestMerge.class); suite.addTestSuite(TestToolPlugin.class); + suite.addTestSuite(TestExportUsingProcedure.class); suite.addTest(MapreduceTests.suite()); return suite; http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/test/com/cloudera/sqoop/TestConnFactory.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/TestConnFactory.java b/src/test/com/cloudera/sqoop/TestConnFactory.java index 893b388..c0b295e 100644 --- a/src/test/com/cloudera/sqoop/TestConnFactory.java +++ b/src/test/com/cloudera/sqoop/TestConnFactory.java @@ -122,6 +122,10 @@ public class TestConnFactory extends TestCase { return null; } + public String[] getColumnNamesForProcedure(String procedureName) { + return null; + } + public String getPrimaryKey(String tableName) { return null; } @@ -148,6 +152,12 @@ public class TestConnFactory extends TestCase { return null; } + @Override + public Map<String, Integer> getColumnTypesForProcedure( + String procedureName) { + return null; + } + public ResultSet readTable(String tableName, String [] columns) { return null; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/test/com/cloudera/sqoop/TestExport.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/TestExport.java b/src/test/com/cloudera/sqoop/TestExport.java index eba10aa..0b650af 100644 --- a/src/test/com/cloudera/sqoop/TestExport.java +++ b/src/test/com/cloudera/sqoop/TestExport.java @@ -795,6 +795,18 @@ public class TestExport extends ExportJobTestCase { } public void testColumnsExport() throws IOException, SQLException { + testColumnsExport("id,msg," + forIdx(0) + "," + forIdx(2)); + } + + /** + * It's possible to change the column string that + * {@link #testColumnsExport()} uses - you might want to do + * this if your database randomly generates column names, instead + * of using the given ones (e.g. stored procedure parameter + * names in H2) + */ + protected void testColumnsExport( + String columnsStr) throws IOException, SQLException { final int TOTAL_COLUMNS = 3; final int TOTAL_RECORDS = 10; @@ -842,7 +854,6 @@ public class TestExport extends ExportJobTestCase { createTextFile(0, TOTAL_RECORDS, false, gen0, gen2); createTable(gen0, gen1, gen2); - String columnsStr = "id,msg," + forIdx(0) + "," + forIdx(2); runExport(getArgv(true, 10, 10, "--columns", columnsStr)); ColumnGenerator genNull = new NullColumnGenerator(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java b/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java index be449e4..e85e62a 100644 --- a/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java +++ b/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java @@ -50,6 +50,7 @@ public class PostgresqlExportTest extends ExportJobTestCase { static final String DATABASE_USER = "sqooptest"; static final String DATABASE_NAME = "sqooptest"; static final String TABLE_NAME = "EMPLOYEES_PG"; + static final String PROCEDURE_NAME = "INSERT_AN_EMPLOYEE"; static final String STAGING_TABLE_NAME = "STAGING"; static final String SCHEMA_PUBLIC = "public"; static final String SCHEMA_SPECIAL = "special"; @@ -80,6 +81,7 @@ public class PostgresqlExportTest extends ExportJobTestCase { createTable(STAGING_TABLE_NAME, SCHEMA_PUBLIC); createTable(TABLE_NAME, SCHEMA_SPECIAL); createTable(STAGING_TABLE_NAME, SCHEMA_SPECIAL); + createProcedure(PROCEDURE_NAME, SCHEMA_PUBLIC); LOG.debug("setUp complete."); } @@ -95,8 +97,86 @@ public class PostgresqlExportTest extends ExportJobTestCase { } } - public void createTable(String tableName, String schema) { - SqoopOptions options = new SqoopOptions(CONNECT_STRING, tableName); + private interface CreateIt { + void createIt( + Statement st, + String fullName, + ConnManager manager) throws SQLException; + } + + private void createTable(String tableName, String schema) { + CreateIt createIt = new CreateIt() { + @Override + public void createIt( + Statement st, + String fullName, + ConnManager manager) throws SQLException { + st.executeUpdate("CREATE TABLE " + fullName + " (" + + manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, " + + manager.escapeColName("name") + " VARCHAR(24) NOT NULL, " + + manager.escapeColName("start_date") + " DATE, " + + manager.escapeColName("salary") + " FLOAT, " + + manager.escapeColName("dept") + " VARCHAR(32))"); + } + }; + create(tableName, "TABLE", schema, createIt); + } + + private void createProcedure(String procedureName, String schema) { + CreateIt createIt = new CreateIt() { + @Override + public void createIt( + Statement st, + String fullName, + ConnManager manager) throws SQLException { + st.executeUpdate("CREATE OR REPLACE FUNCTION " + fullName + " (" + + "IN " + manager.escapeColName("id") + " INT," + + "IN " + manager.escapeColName("name") + " VARCHAR(24)," + + "IN " + manager.escapeColName("start_date") + " DATE," + + "IN " + manager.escapeColName("salary") + " FLOAT," + + "IN " + manager.escapeColName("dept") + " VARCHAR(32)" + + ") " + + "RETURNS VOID " + + "AS $$ " + + "BEGIN " + + "INSERT INTO " + + escapeTableOrSchemaName(SCHEMA_PUBLIC) + + "." + + escapeTableOrSchemaName(TABLE_NAME) + + " (" + + manager.escapeColName("id") + +", " + + manager.escapeColName("name") + +", " + + manager.escapeColName("start_date") + +", " + + manager.escapeColName("salary") + +", " + + manager.escapeColName("dept") + + ") VALUES (" + + manager.escapeColName("id") + +", " + + manager.escapeColName("name") + +", " + + manager.escapeColName("start_date") + +", " + + manager.escapeColName("salary") + +", " + + manager.escapeColName("dept") + + ");" + + "END;" + + "$$ LANGUAGE plpgsql;"); + } + }; + create(procedureName, "FUNCTION", schema, createIt); + } + + private void create( + String name, + String type, + String schema, + CreateIt createIt) { + SqoopOptions options = new SqoopOptions(CONNECT_STRING, name); options.setUsername(DATABASE_USER); ConnManager manager = null; @@ -118,26 +198,24 @@ public class PostgresqlExportTest extends ExportJobTestCase { } String fullTableName = escapeTableOrSchemaName(schema) - + "." + escapeTableOrSchemaName(tableName); + + "." + escapeTableOrSchemaName(name); try { // Try to remove the table first. DROP TABLE IF EXISTS didn't // get added until pg 8.3, so we just use "DROP TABLE" and ignore // any exception here if one occurs. - st.executeUpdate("DROP TABLE " + fullTableName); + st.executeUpdate("DROP " + type + " " + fullTableName); } catch (SQLException e) { - LOG.info("Couldn't drop table " + schema + "." + tableName + " (ok)", + LOG.info("Couldn't drop " + + type.toLowerCase() + + " " +fullTableName + + " (ok)", e); // Now we need to reset the transaction. connection.rollback(); } - st.executeUpdate("CREATE TABLE " + fullTableName + " (" - + manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, " - + manager.escapeColName("name") + " VARCHAR(24) NOT NULL, " - + manager.escapeColName("start_date") + " DATE, " - + manager.escapeColName("salary") + " FLOAT, " - + manager.escapeColName("dept") + " VARCHAR(32))"); + createIt.createIt(st, fullTableName, manager); connection.commit(); } catch (SQLException sqlE) { @@ -161,14 +239,19 @@ public class PostgresqlExportTest extends ExportJobTestCase { LOG.debug("setUp complete."); } - private String [] getArgv(String tableName, + private String [] getArgv(boolean useTable, String... extraArgs) { ArrayList<String> args = new ArrayList<String>(); CommonArgs.addHadoopFlags(args); - args.add("--table"); - args.add(tableName); + if (useTable) { + args.add("--table"); + args.add(TABLE_NAME); + } else { + args.add("--call"); + args.add(PROCEDURE_NAME); + } args.add("--export-dir"); args.add(getWarehouseDir()); args.add("--fields-terminated-by"); @@ -208,7 +291,18 @@ public class PostgresqlExportTest extends ExportJobTestCase { "3,Fred,2009-01-23,15,marketing", }); - runExport(getArgv(TABLE_NAME)); + runExport(getArgv(true)); + + assertRowCount(2, escapeTableOrSchemaName(TABLE_NAME), connection); + } + + public void testExportUsingProcedure() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + runExport(getArgv(false)); assertRowCount(2, escapeTableOrSchemaName(TABLE_NAME), connection); } @@ -221,7 +315,7 @@ public class PostgresqlExportTest extends ExportJobTestCase { String[] extra = new String[] {"--staging-table", STAGING_TABLE_NAME, }; - runExport(getArgv(TABLE_NAME, extra)); + runExport(getArgv(true, extra)); assertRowCount(2, escapeTableOrSchemaName(TABLE_NAME), connection); } @@ -234,7 +328,7 @@ public class PostgresqlExportTest extends ExportJobTestCase { String[] extra = new String[] {"--direct"}; - runExport(getArgv(TABLE_NAME, extra)); + runExport(getArgv(true, extra)); assertRowCount(2, escapeTableOrSchemaName(TABLE_NAME), connection); } @@ -250,7 +344,7 @@ public class PostgresqlExportTest extends ExportJobTestCase { SCHEMA_SPECIAL, }; - runExport(getArgv(TABLE_NAME, extra)); + runExport(getArgv(true, extra)); assertRowCount(2, escapeTableOrSchemaName(SCHEMA_SPECIAL) @@ -272,7 +366,7 @@ public class PostgresqlExportTest extends ExportJobTestCase { SCHEMA_SPECIAL, }; - runExport(getArgv(TABLE_NAME, extra)); + runExport(getArgv(true, extra)); assertRowCount(2, escapeTableOrSchemaName(SCHEMA_SPECIAL) @@ -296,7 +390,7 @@ public class PostgresqlExportTest extends ExportJobTestCase { SCHEMA_SPECIAL, }; - runExport(getArgv(TABLE_NAME, extra)); + runExport(getArgv(true, extra)); assertRowCount(2, escapeTableOrSchemaName(SCHEMA_SPECIAL) @@ -317,7 +411,7 @@ public class PostgresqlExportTest extends ExportJobTestCase { SCHEMA_SPECIAL, }; - runExport(getArgv(TABLE_NAME, extra)); + runExport(getArgv(true, extra)); assertRowCount(2, escapeTableOrSchemaName(SCHEMA_SPECIAL) http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java b/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java index 4f6fd37..e13f3df 100644 --- a/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java +++ b/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java @@ -131,8 +131,10 @@ public abstract class ExportJobTestCase extends BaseSqoopTestCase { } } - args.add("--table"); - args.add(getTableName()); + if (usesSQLtable()) { + args.add("--table"); + args.add(getTableName()); + } args.add("--export-dir"); args.add(getTablePath().toString()); args.add("--connect"); @@ -152,6 +154,10 @@ public abstract class ExportJobTestCase extends BaseSqoopTestCase { return args.toArray(new String[0]); } + protected boolean usesSQLtable() { + return true; + } + /** When exporting text columns, what should the text contain? */ protected String getMsgPrefix() { return "textfield"; http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/test/org/apache/sqoop/TestExportUsingProcedure.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestExportUsingProcedure.java b/src/test/org/apache/sqoop/TestExportUsingProcedure.java new file mode 100644 index 0000000..6414ef7 --- /dev/null +++ b/src/test/org/apache/sqoop/TestExportUsingProcedure.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop; + +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.Date; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Time; +import java.sql.Types; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.sqoop.manager.GenericJdbcManager; +import org.apache.sqoop.tool.ExportTool; +import org.h2.Driver; +import org.junit.After; +import org.junit.Before; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.TestExport; + +/** + * We'll use H2 as a database as the version of HSQLDB we currently depend on + * (1.8) doesn't include support for stored procedures. + */ +public class TestExportUsingProcedure extends TestExport { + private static final String PROCEDURE_NAME = "INSERT_PROCEDURE"; + /** + * Stored procedures are static; we'll need an instance to get a connection. + */ + private static TestExportUsingProcedure instanceForProcedure; + private int functionCalls = 0; + private String[] names; + private String[] types; + private Connection connection; + + @Override + @Before + public void setUp() { + super.setUp(); + instanceForProcedure = this; + } + + @Override + public void createTable(ColumnGenerator... extraColumns) throws SQLException { + super.createTable(extraColumns); + names = new String[extraColumns.length]; + types = new String[extraColumns.length]; + for (int i = 0; i < extraColumns.length; ++i) { + names[i] = forIdx(i); + types[i] = extraColumns[i].getType(); + } + createProcedure(names, types); + } + + private void createProcedure(String[] extraNames, String[] extraTypes) + throws SQLException { + StringBuilder drop = new StringBuilder("DROP ALIAS IF EXISTS "); + drop.append(PROCEDURE_NAME); + + StringBuilder create = new StringBuilder("CREATE ALIAS "); + create.append(PROCEDURE_NAME); + create.append(" FOR \""); + create.append(getClass().getName()); + create.append(".insertFunction"); + if (extraNames.length > 0) { + create.append(getName()); + } + create.append('"'); + + Connection conn = getConnection(); + Statement statement = conn.createStatement(); + try { + statement.execute(drop.toString()); + statement.execute(create.toString()); + conn.commit(); + } finally { + statement.close(); + } + } + + @Override + protected String getConnectString() { + return "jdbc:h2:mem:" + getName(); + } + + @Override + protected void verifyExport(int expectedNumRecords, Connection conn) + throws IOException, SQLException { + assertEquals("stored procedure must be called for each row", + expectedNumRecords, functionCalls); + super.verifyExport(expectedNumRecords, conn); + } + + @Override + protected String[] getArgv(boolean includeHadoopFlags, int rowsPerStmt, + int statementsPerTx, String... additionalArgv) { + // we need different class names per test, or the classloader will + // just use the old class definition even though we've compiled a + // new one! + String[] args = newStrArray(additionalArgv, "--" + ExportTool.CALL_ARG, + PROCEDURE_NAME, "--" + ExportTool.CLASS_NAME_ARG, getName(), "--" + + ExportTool.CONN_MANAGER_CLASS_NAME, + GenericJdbcManager.class.getName(), "--" + ExportTool.DRIVER_ARG, + Driver.class.getName()); + return super + .getArgv(includeHadoopFlags, rowsPerStmt, statementsPerTx, args); + } + + @Override + protected String[] getCodeGenArgv(String... extraArgs) { + String[] myExtraArgs = newStrArray(extraArgs, "--" + + ExportTool.CONN_MANAGER_CLASS_NAME, + GenericJdbcManager.class.getName(), "--" + ExportTool.DRIVER_ARG, + Driver.class.getName()); + return super.getCodeGenArgv(myExtraArgs); + } + + @Override + protected Connection getConnection() { + if (connection != null) { + return connection; + } + try { + connection = DriverManager.getConnection(getConnectString()); + connection.setAutoCommit(false); + return connection; + } catch (SQLException e) { + throw new AssertionError(e.getMessage()); + } + } + + /** + * This gets called during {@link #setUp()} to check the non-HSQLDB database + * is valid. We'll therefore set the connection manager here... + */ + @Override + protected SqoopOptions getSqoopOptions(Configuration conf) { + SqoopOptions ret = new SqoopOptions(conf); + if (ret.getConnManagerClassName() == null) { + ret.setConnManagerClassName(GenericJdbcManager.class.getName()); + } + if (ret.getDriverClassName() == null) { + ret.setDriverClassName(Driver.class.getName()); + } + return ret; + } + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + @Override + protected boolean usesSQLtable() { + return false; + } + + @Override + @After + public void tearDown() { + super.tearDown(); + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + // don't really care, it's only in memory + } + } + } + + // TEST OVERRIDES + + @Override + public void testMultiMapTextExportWithStaging() throws IOException, + SQLException { + try { + super.testMultiMapTextExportWithStaging(); + fail("staging tables not compatible with --call"); + } catch (IOException e) { + // expected + } + } + + @Override + public void testMultiTransactionWithStaging() throws IOException, + SQLException { + try { + super.testMultiTransactionWithStaging(); + fail("staging tables not compatible with --call"); + } catch (IOException e) { + // expected + } + } + + /** + * H2 renames the stored procedure arguments P1, P2, ..., Pn. + */ + @Override + public void testColumnsExport() throws IOException, SQLException { + super.testColumnsExport("P1,P2,P3,P4"); + } + + // STORED PROCEDURES + + private interface SetExtraArgs { + void set(PreparedStatement on) throws SQLException; + } + + private static void insertFunction(int id, String msg, + SetExtraArgs setExtraArgs) throws SQLException { + instanceForProcedure.functionCalls += 1; + Connection con = instanceForProcedure.getConnection(); + + StringBuilder sql = new StringBuilder("insert into "); + sql.append(instanceForProcedure.getTableName()); + sql.append("(id, msg"); + for (int i = 0; i < instanceForProcedure.names.length; ++i) { + sql.append(", "); + sql.append(instanceForProcedure.names[i]); + } + sql.append(") values ("); + sql.append(StringUtils.repeat("?", ", ", + instanceForProcedure.names.length + 2)); + sql.append(")"); + + PreparedStatement statement = con.prepareStatement(sql.toString()); + try { + statement.setInt(1, id); + statement.setString(2, msg); + setExtraArgs.set(statement); + statement.execute(); + con.commit(); + } finally { + statement.close(); + } + } + + public static void insertFunction(int id, String msg) throws SQLException { + insertFunction(id, msg, new SetExtraArgs() { + @Override + public void set(PreparedStatement on) throws SQLException { + } + }); + } + + public static void insertFunctiontestIntCol(int id, String msg, + final int testIntCol) throws SQLException { + insertFunction(id, msg, new SetExtraArgs() { + @Override + public void set(PreparedStatement on) throws SQLException { + on.setInt(3, testIntCol); + } + }); + } + + public static void insertFunctiontestBigIntCol(int id, String msg, + final long testBigIntCol) throws SQLException { + insertFunction(id, msg, new SetExtraArgs() { + @Override + public void set(PreparedStatement on) throws SQLException { + on.setLong(3, testBigIntCol); + } + }); + } + + public static void insertFunctiontestDatesAndTimes(int id, String msg, + final Date date, final Time time) throws SQLException { + insertFunction(id, msg, new SetExtraArgs() { + @Override + public void set(PreparedStatement on) throws SQLException { + on.setDate(3, date); + on.setTime(4, time); + } + }); + } + + public static void insertFunctiontestNumericTypes(int id, String msg, + final BigDecimal f, final BigDecimal d) throws SQLException { + insertFunction(id, msg, new SetExtraArgs() { + @Override + public void set(PreparedStatement on) throws SQLException { + on.setBigDecimal(3, f); + on.setBigDecimal(4, d); + } + }); + } + + /** + * This test case is special - we're only inserting into a subset of the + * columns in the table. + */ + public static void insertFunctiontestColumnsExport(int id, String msg, + final int int1, final int int2) throws SQLException { + insertFunction(id, msg, new SetExtraArgs() { + @Override + public void set(PreparedStatement on) throws SQLException { + on.setInt(3, int1); + on.setNull(4, Types.INTEGER); + on.setInt(5, int2); + } + }); + } + +}
