[
https://issues.apache.org/jira/browse/HADOOP-4331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alexander Schwid updated HADOOP-4331:
-------------------------------------
Description:
package mapred.lib.db
added batch size support for JDBC in DBOutputFormat
recieve DBWritable object in value not in key in DBOutputFormat
was:
add batch size support for JDBC in DBOutputFormat
recieve DBWritable object in value not in key in DBOutputFormat
---------------patch--------------
Index: src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java
(revision 701034)
+++ src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java
(working copy)
@@ -80,6 +80,11 @@
/** Field names in the Output table */
public static final String OUTPUT_FIELD_NAMES_PROPERTY =
"mapred.jdbc.output.field.names";
+ /** Batch size for output statement */
+ public static final String OUTPUT_BATCH_SIZE =
"mapred.jdbc.output.batch.size";
+
+ public static final int DEFAULT_BATCH_SIZE = 1000;
+
/**
* Sets the DB access related fields in the JobConf.
* @param job the job
@@ -212,5 +217,12 @@
job.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames);
}
+ int getBatchSize() {
+ return job.getInt(DBConfiguration.OUTPUT_BATCH_SIZE, DEFAULT_BATCH_SIZE);
+ }
+
+ void setBatchSize(int sz) {
+ job.setInt(DBConfiguration.OUTPUT_BATCH_SIZE, sz);
+ }
}
Index: src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
(revision 701034)
+++ src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
(working copy)
@@ -37,11 +37,11 @@
* A OutputFormat that sends the reduce output to a SQL table.
* <p>
* [EMAIL PROTECTED] DBOutputFormat} accepts <key,value> pairs, where
- * key has a type extending DBWritable. Returned [EMAIL PROTECTED]
RecordWriter}
- * writes <b>only the key</b> to the database with a batch SQL query.
+ * value has a type extending DBWritable. Returned [EMAIL PROTECTED]
RecordWriter}
+ * writes <b>only the value</b> to the database with a batch SQL query.
*
*/
-public class DBOutputFormat<K extends DBWritable, V>
+public class DBOutputFormat<K, V extends DBWritable>
implements OutputFormat<K,V> {
private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
@@ -54,27 +54,21 @@
private Connection connection;
private PreparedStatement statement;
+ private int batch = 0;
+ private int batchSize;
protected DBRecordWriter(Connection connection
- , PreparedStatement statement) throws SQLException {
+ , PreparedStatement statement, int batchSize) throws SQLException {
this.connection = connection;
this.statement = statement;
this.connection.setAutoCommit(false);
+ this.batchSize = batchSize;
}
/** [EMAIL PROTECTED] */
public void close(Reporter reporter) throws IOException {
try {
- statement.executeBatch();
- connection.commit();
- } catch (SQLException e) {
- try {
- connection.rollback();
- }
- catch (SQLException ex) {
- LOG.warn(StringUtils.stringifyException(ex));
- }
- throw new IOException(e.getMessage());
+ executeBatch();
} finally {
try {
statement.close();
@@ -89,12 +83,37 @@
/** [EMAIL PROTECTED] */
public void write(K key, V value) throws IOException {
try {
- key.write(statement);
+ value.write(statement);
statement.addBatch();
+ batch++;
+ if (batch == batchSize) {
+ executeBatch();
+ batch = 0;
+ }
+
} catch (SQLException e) {
e.printStackTrace();
}
}
+
+ private void executeBatch() throws IOException {
+ if (batch > 0) {
+ try {
+ statement.executeBatch();
+ connection.commit();
+ statement.clearBatch();
+ }
+ catch(SQLException e) {
+ try {
+ connection.rollback();
+ }
+ catch (SQLException ex) {
+ LOG.warn(StringUtils.stringifyException(ex));
+ }
+ throw new IOException(e.getMessage());
+ }
+ }
+ }
}
/**
@@ -129,13 +148,14 @@
DBConfiguration dbConf = new DBConfiguration(job);
String tableName = dbConf.getOutputTableName();
String[] fieldNames = dbConf.getOutputFieldNames();
+ int batchSize = dbConf.getBatchSize();
try {
Connection connection = dbConf.getConnection();
PreparedStatement statement = null;
statement = connection.prepareStatement(constructQuery(tableName,
fieldNames));
- return new DBRecordWriter(connection, statement);
+ return new DBRecordWriter(connection, statement, batchSize);
}
catch (Exception ex) {
throw new IOException(ex.getMessage());
> DBOutputFormat: add batch size support for JDBC and recieve DBWritable
> object in value not in key
> --------------------------------------------------------------------------------------------------
>
> Key: HADOOP-4331
> URL: https://issues.apache.org/jira/browse/HADOOP-4331
> Project: Hadoop Core
> Issue Type: Improvement
> Components: mapred
> Reporter: Alexander Schwid
> Priority: Minor
> Fix For: 0.19.0
>
> Attachments: patch.txt
>
>
> package mapred.lib.db
> added batch size support for JDBC in DBOutputFormat
> recieve DBWritable object in value not in key in DBOutputFormat
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.