Author: cdouglas
Date: Thu May 15 13:46:51 2008
New Revision: 656828
URL: http://svn.apache.org/viewvc?rev=656828&view=rev
Log:
HADOOP-3350. Add an argument to distcp to permit the user to limit the
number of maps.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=656828&r1=656827&r2=656828&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu May 15 13:46:51 2008
@@ -134,6 +134,9 @@
HADOOP-3355. Enhances Configuration class to accept hex numbers for getInt
and getLong. (Amareshwari Sriramadasu via ddas)
+ HADOOP-3350. Add an argument to distcp to permit the user to limit the
+ number of maps. (cdouglas)
+
OPTIMIZATIONS
HADOOP-3274. The default constructor of BytesWritable creates empty
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?rev=656828&r1=656827&r2=656828&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Thu May 15
13:46:51 2008
@@ -74,6 +74,7 @@
"\n -p alone is equivalent to -prbugp" +
"\n-i Ignore failures" +
"\n-log <logdir> Write logs to <logdir>" +
+ "\n-m <num_maps> Maximum number of simultaneous copies" +
"\n-overwrite Overwrite destination" +
"\n-update Overwrite if src size different from dst size" +
"\n-f <urilist_uri> Use list at <urilist_uri> as src list" +
@@ -139,10 +140,12 @@
static final String TMP_DIR_LABEL = NAME + ".tmp.dir";
static final String DST_DIR_LABEL = NAME + ".dest.path";
static final String JOB_DIR_LABEL = NAME + ".job.dir";
+ static final String MAX_MAPS_LABEL = NAME + ".max.map.tasks";
static final String SRC_LIST_LABEL = NAME + ".src.list";
static final String SRC_COUNT_LABEL = NAME + ".src.count";
static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
static final String DST_DIR_LIST_LABEL = NAME + ".dst.dir.list";
+ static final String BYTES_PER_MAP_LABEL = NAME + ".bytes.per.map";
static final String PRESERVE_STATUS_LABEL
= Options.PRESERVE_STATUS.propertyname + ".value";
@@ -708,6 +711,16 @@
throw new IllegalArgumentException("logdir not specified in -log");
}
log = new Path(args[idx]);
+ } else if ("-m".equals(args[idx])) {
+ if (++idx == args.length) {
+ throw new IllegalArgumentException("num_maps not specified in -m");
+ }
+ try {
+ conf.setInt(MAX_MAPS_LABEL, Integer.valueOf(args[idx]));
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid argument to -m: " +
+ args[idx]);
+ }
} else if ('-' == args[idx].codePointAt(0)) {
throw new IllegalArgumentException("Invalid switch " + args[idx]);
} else if (idx == args.length -1) {
@@ -793,17 +806,22 @@
/**
* Calculate how many maps to run.
- * Number of maps is bounded by a minimum of the cumulative size of the copy
/
- * BYTES_PER_MAP and at most MAX_MAPS_PER_NODE * nodes in the
- * cluster.
+ * Number of maps is bounded by a minimum of the cumulative size of the
+ * copy / (distcp.bytes.per.map, default BYTES_PER_MAP or -m on the
+ * command line) and at most (distcp.max.map.tasks, default
+ * MAX_MAPS_PER_NODE * nodes in the cluster).
* @param totalBytes Count of total bytes for job
- * @param numNodes the number of nodes in cluster
+ * @param job The job to configure
* @return Count of maps to run.
*/
- private static int getMapCount(long totalBytes, int numNodes) {
- int numMaps = (int)(totalBytes / BYTES_PER_MAP);
- numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
- return Math.max(numMaps, 1);
+ private static void setMapCount(long totalBytes, JobConf job)
+ throws IOException {
+ int numMaps =
+ (int)(totalBytes / job.getLong(BYTES_PER_MAP_LABEL, BYTES_PER_MAP));
+ numMaps = Math.min(numMaps,
+ job.getInt(MAX_MAPS_LABEL, MAX_MAPS_PER_NODE *
+ new JobClient(job).getClusterStatus().getTaskTrackers()));
+ job.setNumMapTasks(Math.max(numMaps, 1));
}
/** Fully delete dir */
@@ -989,8 +1007,7 @@
LOG.info("srcCount=" + srcCount);
jobConf.setInt(SRC_COUNT_LABEL, srcCount);
jobConf.setLong(TOTAL_SIZE_LABEL, cbsize);
- jobConf.setNumMapTasks(getMapCount(cbsize,
- new JobClient(jobConf).getClusterStatus().getTaskTrackers()));
+ setMapCount(cbsize, jobConf);
}
static private void checkDuplication(FileSystem fs, Path file, Path sorted,
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=656828&r1=656827&r2=656828&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Thu May
15 13:46:51 2008
@@ -27,6 +27,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.util.CopyFiles;
import org.apache.hadoop.util.ToolRunner;
@@ -519,4 +521,50 @@
if (cluster != null) { cluster.shutdown(); }
}
}
+
+ public void testMapCount() throws Exception {
+ String namenode = null;
+ MiniDFSCluster dfs = null;
+ MiniMRCluster mr = null;
+ try {
+ Configuration conf = new Configuration();
+ dfs = new MiniDFSCluster(conf, 3, true, null);
+ FileSystem fs = dfs.getFileSystem();
+ namenode = fs.getUri().toString();
+ mr = new MiniMRCluster(3, namenode, 1);
+ MyFile[] files = createFiles(fs.getUri(), "/srcdat");
+ long totsize = 0;
+ for (MyFile f : files) {
+ totsize += f.getSize();
+ }
+ JobConf job = mr.createJobConf();
+ job.setLong("distcp.bytes.per.map", totsize / 3);
+ ToolRunner.run(new CopyFiles(job),
+ new String[] {"-m", "100",
+ "-log",
+ namenode+"/logs",
+ namenode+"/srcdat",
+ namenode+"/destdat"});
+ assertTrue("Source and destination directories do not match.",
+ checkFiles(namenode, "/destdat", files));
+ FileStatus[] logs = fs.listStatus(new Path(namenode+"/logs"));
+ // rare case where splits are exact, logs.length can be 4
+ assertTrue("Unexpected map count", logs.length == 5 || logs.length == 4);
+
+ deldir(namenode, "/destdat");
+ deldir(namenode, "/logs");
+ ToolRunner.run(new CopyFiles(job),
+ new String[] {"-m", "1",
+ "-log",
+ namenode+"/logs",
+ namenode+"/srcdat",
+ namenode+"/destdat"});
+ logs = fs.listStatus(new Path(namenode+"/logs"));
+ assertTrue("Unexpected map count", logs.length == 2);
+ } finally {
+ if (dfs != null) { dfs.shutdown(); }
+ if (mr != null) { mr.shutdown(); }
+ }
+ }
+
}