Author: ddas
Date: Mon Nov 17 04:23:15 2008
New Revision: 718229
URL: http://svn.apache.org/viewvc?rev=718229&view=rev
Log:
HADOOP-4188. Removes task's dependency on concrete filesystems. Contributed by
Sharad Agarwal.
Removed:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_FileSystemCounter.properties
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java
hadoop/core/trunk/src/core/org/apache/hadoop/fs/FilterFileSystem.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestCounters.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=718229&r1=718228&r2=718229&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Nov 17 04:23:15 2008
@@ -30,6 +30,9 @@
HADOOP-4628. Move Hive into a standalone subproject. (omalley)
+ HADOOP-4188. Removes task's dependency on concrete filesystems.
+ (Sharad Agarwal via ddas)
+
NEW FEATURES
HADOOP-4575. Add a proxy service for relaying HsftpFileSystem requests.
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java?rev=718229&r1=718228&r2=718229&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java Mon Nov 17
04:23:15 2008
@@ -68,11 +68,15 @@
private static final Map<Class<? extends FileSystem>, Statistics>
statisticsTable =
new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
+
+ /** Recording statistics per FileSystem URI scheme */
+ private static final Map<String, Statistics> statsByUriScheme =
+ new HashMap<String, Statistics>();
/**
* The statistics for this file system.
*/
- protected final Statistics statistics;
+ protected Statistics statistics;
/**
* A cache of files that should be deleted when filsystem is closed
@@ -1365,6 +1369,7 @@
}
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
fs.initialize(uri, conf);
+ statsByUriScheme.put(uri.getScheme(), fs.statistics);
return fs;
}
@@ -1516,7 +1521,16 @@
}
/**
+ * Get the Map of Statistics object indexed by URI Scheme.
+ * @return a Map having a key as URI scheme and value as Statistics object
+ */
+ public static synchronized Map<String, Statistics> getStatistics() {
+ return statsByUriScheme;
+ }
+
+ /**
* Get the statistics for a particular file system
+ * @deprecated Consider using [EMAIL PROTECTED] #getStatistics()} instead.
* @param cls the class to lookup
* @return a statistics object
*/
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/FilterFileSystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/FilterFileSystem.java?rev=718229&r1=718228&r2=718229&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/FilterFileSystem.java
(original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/FilterFileSystem.java Mon
Nov 17 04:23:15 2008
@@ -52,6 +52,7 @@
public FilterFileSystem(FileSystem fs) {
this.fs = fs;
+ this.statistics = fs.statistics;
}
/** Called after a new FileSystem instance is constructed.
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=718229&r1=718228&r2=718229&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Mon Nov 17
04:23:15 2008
@@ -23,8 +23,10 @@
import java.io.IOException;
import java.text.NumberFormat;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,13 +34,9 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.fs.kfs.KosmosFileSystem;
-import org.apache.hadoop.fs.s3.S3FileSystem;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
@@ -75,13 +73,18 @@
/**
* Counters to measure the usage of the different file systems.
+ * Always return the String array with two elements. First one is the name
of
+ * BYTES_READ counter and second one is of the BYTES_WRITTEN counter.
*/
- protected static enum FileSystemCounter {
- LOCAL_READ, LOCAL_WRITE,
- HDFS_READ, HDFS_WRITE,
- S3_READ, S3_WRITE,
- KFS_READ, KFSWRITE
+ protected static String[] getFileSystemCounterNames(String uriScheme) {
+ String scheme = uriScheme.toUpperCase();
+ return new String[]{scheme+"_BYTES_READ", scheme+"_BYTES_WRITTEN"};
}
+
+ /**
+ * Name of the FileSystem counters' group
+ */
+ protected static final String FILESYSTEM_COUNTER_GROUP =
"FileSystemCounters";
///////////////////////////////////////////////////////////
// Helper methods to construct task-output paths
@@ -513,15 +516,11 @@
private FileSystem.Statistics stats;
private Counters.Counter readCounter = null;
private Counters.Counter writeCounter = null;
- private FileSystemCounter read;
- private FileSystemCounter write;
-
- FileSystemStatisticUpdater(FileSystemCounter read,
- FileSystemCounter write,
- Class<? extends FileSystem> cls) {
- stats = FileSystem.getStatistics(cls);
- this.read = read;
- this.write = write;
+ private String[] counterNames;
+
+ FileSystemStatisticUpdater(String uriScheme, FileSystem.Statistics stats) {
+ this.stats = stats;
+ this.counterNames = getFileSystemCounterNames(uriScheme);
}
void updateCounters() {
@@ -529,14 +528,16 @@
long newWriteBytes = stats.getBytesWritten();
if (prevReadBytes != newReadBytes) {
if (readCounter == null) {
- readCounter = counters.findCounter(read);
+ readCounter = counters.findCounter(FILESYSTEM_COUNTER_GROUP,
+ counterNames[0]);
}
readCounter.increment(newReadBytes - prevReadBytes);
prevReadBytes = newReadBytes;
}
if (prevWriteBytes != newWriteBytes) {
if (writeCounter == null) {
- writeCounter = counters.findCounter(write);
+ writeCounter = counters.findCounter(FILESYSTEM_COUNTER_GROUP,
+ counterNames[1]);
}
writeCounter.increment(newWriteBytes - prevWriteBytes);
prevWriteBytes = newWriteBytes;
@@ -545,31 +546,20 @@
}
/**
- * A list of all of the file systems that we want to report on.
+ * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
*/
- private List<FileSystemStatisticUpdater> statisticUpdaters =
- new ArrayList<FileSystemStatisticUpdater>();
- {
- statisticUpdaters.add
- (new FileSystemStatisticUpdater(FileSystemCounter.LOCAL_READ,
- FileSystemCounter.LOCAL_WRITE,
- RawLocalFileSystem.class));
- statisticUpdaters.add
- (new FileSystemStatisticUpdater(FileSystemCounter.HDFS_READ,
- FileSystemCounter.HDFS_WRITE,
- DistributedFileSystem.class));
- statisticUpdaters.add
- (new FileSystemStatisticUpdater(FileSystemCounter.KFS_READ,
- FileSystemCounter.KFSWRITE,
- KosmosFileSystem.class));
- statisticUpdaters.add
- (new FileSystemStatisticUpdater(FileSystemCounter.S3_READ,
- FileSystemCounter.S3_WRITE,
- S3FileSystem.class));
- }
-
+ private Map<String, FileSystemStatisticUpdater> statisticUpdaters =
+ new HashMap<String, FileSystemStatisticUpdater>();
+
private synchronized void updateCounters() {
- for(FileSystemStatisticUpdater updater: statisticUpdaters) {
+ for(Map.Entry<String, FileSystem.Statistics> entry :
+ FileSystem.getStatistics().entrySet()) {
+ String uriScheme = entry.getKey();
+ FileSystemStatisticUpdater updater = statisticUpdaters.get(uriScheme);
+ if(updater==null) {//new FileSystem has been found in the cache
+ updater = new FileSystemStatisticUpdater(uriScheme, entry.getValue());
+ statisticUpdaters.put(uriScheme, updater);
+ }
updater.updateCounters();
}
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestCounters.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestCounters.java?rev=718229&r1=718228&r2=718229&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestCounters.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestCounters.java Mon
Nov 17 04:23:15 2008
@@ -67,8 +67,7 @@
}
public void testCounters() throws IOException {
- Enum[] keysWithResource = {Task.FileSystemCounter.HDFS_READ,
- Task.Counter.MAP_INPUT_BYTES,
+ Enum[] keysWithResource = {Task.Counter.MAP_INPUT_BYTES,
Task.Counter.MAP_OUTPUT_BYTES};
Enum[] keysWithoutResource = {myCounters.TEST1, myCounters.TEST2};
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=718229&r1=718228&r2=718229&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
Mon Nov 17 04:23:15 2008
@@ -203,9 +203,11 @@
assertEquals("is\t1\noom\t1\nowen\t1\n", result.output);
Counters counters = result.job.getCounters();
long hdfsRead =
- counters.findCounter(Task.FileSystemCounter.HDFS_READ).getCounter();
+ counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
+ Task.getFileSystemCounterNames("hdfs")[0]).getCounter();
long hdfsWrite =
- counters.findCounter(Task.FileSystemCounter.HDFS_WRITE).getCounter();
+ counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
+ Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
assertEquals(result.output.length(), hdfsWrite);
assertEquals(input.length(), hdfsRead);
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java?rev=718229&r1=718228&r2=718229&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java
Mon Nov 17 04:23:15 2008
@@ -35,8 +35,6 @@
import org.apache.hadoop.mapred.TestMapCollection.FakeIF;
import org.apache.hadoop.mapred.TestMapCollection.FakeSplit;
import org.apache.hadoop.mapred.lib.IdentityReducer;
-import static org.apache.hadoop.mapred.Task.FileSystemCounter.HDFS_WRITE;
-import static org.apache.hadoop.mapred.Task.FileSystemCounter.LOCAL_READ;
public class TestReduceFetch extends TestCase {
@@ -107,8 +105,10 @@
job.set("mapred.job.reduce.input.buffer.percent", "0.0");
job.setNumMapTasks(3);
Counters c = runJob(job);
- final long hdfsWritten = c.findCounter(HDFS_WRITE).getCounter();
- final long localRead = c.findCounter(LOCAL_READ).getCounter();
+ final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
+ Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
+ final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
+ Task.getFileSystemCounterNames("file")[0]).getCounter();
assertTrue("Expected more bytes read from local (" +
localRead + ") than written to HDFS (" + hdfsWritten + ")",
hdfsWritten <= localRead);
@@ -126,8 +126,10 @@
job.setNumTasksToExecutePerJvm(1);
job.set("mapred.job.shuffle.merge.percent", "1.0");
Counters c = runJob(job);
- final long hdfsWritten = c.findCounter(HDFS_WRITE).getCounter();
- final long localRead = c.findCounter(LOCAL_READ).getCounter();
+ final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
+ Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
+ final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
+ Task.getFileSystemCounterNames("file")[0]).getCounter();
assertTrue("Expected at least 1MB fewer bytes read from local (" +
localRead + ") than written to HDFS (" + hdfsWritten + ")",
hdfsWritten >= localRead + 1024 * 1024);
@@ -138,7 +140,8 @@
job.set("mapred.job.reduce.input.buffer.percent", "1.0");
job.setNumMapTasks(3);
Counters c = runJob(job);
- final long localRead = c.findCounter(LOCAL_READ).getCounter();
+ final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
+ Task.getFileSystemCounterNames("file")[0]).getCounter();
assertTrue("Non-zero read from local: " + localRead, localRead == 0);
}