Updated Branches: refs/heads/trunk 5eb987a78 -> c499f4909
SQOOP-604: Easy throttling feature for MySQL exports (Zoltan Toth-Czifra via Abhijeet Gaikwad) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/c499f490 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/c499f490 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/c499f490 Branch: refs/heads/trunk Commit: c499f49097ebf04f9fac34f1df768a319e679cea Parents: 5eb987a Author: Abhijeet Gaikwad <[email protected]> Authored: Sat Nov 3 10:48:49 2012 +0530 Committer: Abhijeet Gaikwad <[email protected]> Committed: Sat Nov 3 10:48:49 2012 +0530 ---------------------------------------------------------------------- .../apache/sqoop/mapreduce/MySQLExportMapper.java | 33 +++++++++++++++ 1 files changed, 33 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/c499f490/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java b/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java index a4e8b88..dc1c126 100644 --- a/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java @@ -64,6 +64,18 @@ public class MySQLExportMapper<KEYIN, VALIN> // Configured value for MSYQL_CHECKPOINT_BYTES_KEY. protected long checkpointDistInBytes; + /** Configuration key that specifies the number of milliseconds + * to sleep at the end of each checkpoint commit + * Default is 0, no sleep. + */ + public static final String MYSQL_CHECKPOINT_SLEEP_KEY = + "sqoop.mysql.export.sleep.ms"; + + public static final long DEFAULT_CHECKPOINT_SLEEP_MS = 0; + + // Configured value for MYSQL_CHECKPOINT_SLEEP_KEY. + protected long checkpointSleepMs; + protected Configuration conf; /** The FIFO being used to communicate with mysqlimport. */ @@ -314,6 +326,21 @@ public class MySQLExportMapper<KEYIN, VALIN> LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_BYTES_KEY); this.checkpointDistInBytes = DEFAULT_CHECKPOINT_BYTES; } + + this.checkpointSleepMs = conf.getLong( + MYSQL_CHECKPOINT_SLEEP_KEY, DEFAULT_CHECKPOINT_SLEEP_MS); + + if (this.checkpointSleepMs < 0) { + LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_SLEEP_KEY); + this.checkpointSleepMs = DEFAULT_CHECKPOINT_SLEEP_MS; + } + + if (this.checkpointSleepMs >= conf.getLong("mapred.task.timeout", 0)) { + LOG.warn("Value for " + + MYSQL_CHECKPOINT_SLEEP_KEY + + " has to be smaller than mapred.task.timeout"); + this.checkpointSleepMs = DEFAULT_CHECKPOINT_SLEEP_MS; + } } /** @@ -347,6 +374,12 @@ public class MySQLExportMapper<KEYIN, VALIN> if (this.checkpointDistInBytes != 0 && this.bytesWritten > this.checkpointDistInBytes) { LOG.info("Checkpointing current export."); + + if (this.checkpointSleepMs != 0) { + LOG.info("Pausing."); + Thread.sleep(this.checkpointSleepMs); + } + closeExportHandles(); initMySQLImportProcess(); this.bytesWritten = 0;
