Repository: sqoop Updated Branches: refs/heads/trunk 462bd9170 -> d03faf354
SQOOP-1341: Sqoop Export Upsert for MySQL lacks batch support (Andy Skelton via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/d03faf35 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/d03faf35 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/d03faf35 Branch: refs/heads/trunk Commit: d03faf3544b1ef07c9496fa7641ed6a42f58cb1d Parents: 462bd91 Author: Jarek Jarcec Cecho <[email protected]> Authored: Sat Jun 28 16:42:34 2014 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Sat Jun 28 16:42:34 2014 -0700 ---------------------------------------------------------------------- .../mysql/MySQLUpsertOutputFormat.java | 47 ++++++++++++++++---- 1 file changed, 39 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/d03faf35/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java index e6c758b..72fffc4 100644 --- a/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java +++ b/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java @@ -25,7 +25,10 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.List; /** * Output format for MySQL Update/insert functionality. We will use MySQL @@ -63,7 +66,33 @@ public class MySQLUpsertOutputFormat<K extends SqoopRecord, V> * {@inheritDoc} */ @Override - protected String getUpdateStatement() { + protected PreparedStatement getPreparedStatement( + List<SqoopRecord> userRecords) throws SQLException { + + PreparedStatement stmt = null; + + // Synchronize on connection to ensure this does not conflict + // with the operations in the update thread. + Connection conn = getConnection(); + synchronized (conn) { + stmt = conn.prepareStatement(getUpdateStatement(userRecords.size())); + } + + // Inject the record parameters into the UPDATE and WHERE clauses. This + // assumes that the update key column is the last column serialized in + // by the underlying record. Our code auto-gen process for exports was + // responsible for taking care of this constraint. + int i = 0; + for (SqoopRecord record : userRecords) { + record.write(stmt, i); + i += columnNames.length; + } + stmt.addBatch(); + + return stmt; + } + + protected String getUpdateStatement(int numRows) { boolean first; StringBuilder sb = new StringBuilder(); sb.append("INSERT INTO "); @@ -80,14 +109,16 @@ public class MySQLUpsertOutputFormat<K extends SqoopRecord, V> } sb.append(") VALUES("); - first = true; - for (int i = 0; i < columnNames.length; i++) { - if (first) { - first = false; - } else { - sb.append(", "); + for (int i = 0; i < numRows; i++) { + if (i > 0) { + sb.append("),("); + } + for (int j = 0; j < columnNames.length; j++) { + if (j > 0) { + sb.append(", "); + } + sb.append("?"); } - sb.append("?"); } sb.append(") ON DUPLICATE KEY UPDATE ");
