This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new cde0f80  TEZ-4145: Reduce lock contention in TezSpillRecord (László 
Bodor reviewed by Ashutosh Chauhan, Jonathan Turner Eagles, Rajesh Balamohan)
cde0f80 is described below

commit cde0f806063b9023ded2436cbb23b22d541b7024
Author: László Bodor <[email protected]>
AuthorDate: Tue Apr 14 23:02:48 2020 +0200

    TEZ-4145: Reduce lock contention in TezSpillRecord (László Bodor reviewed 
by Ashutosh Chauhan, Jonathan Turner Eagles, Rajesh Balamohan)
    
    Signed-off-by: Laszlo Bodor <[email protected]>
    (cherry picked from commit 7dbec63e1f97eea95ab998e16ffcd592ff6be332)
---
 .../org/apache/tez/shufflehandler/IndexCache.java  |  13 ++-
 .../org/apache/tez/auxservices/IndexCache.java     |  13 ++-
 .../apache/tez/auxservices/TestShuffleHandler.java |   3 +-
 .../runtime/library/common/shuffle/Fetcher.java    |   5 +-
 .../orderedgrouped/FetcherOrderedGrouped.java      |   6 +-
 .../shuffle/orderedgrouped/ShuffleScheduler.java   |   6 +-
 .../library/common/sort/impl/ExternalSorter.java   |   2 +
 .../library/common/sort/impl/PipelinedSorter.java  |   8 +-
 .../library/common/sort/impl/TezSpillRecord.java   |  23 ++---
 .../common/sort/impl/dflt/DefaultSorter.java       |  20 ++--
 .../writers/BaseUnorderedPartitionedKVWriter.java  |   8 ++
 .../writers/UnorderedPartitionedKVWriter.java      |   6 +-
 .../library/output/OrderedPartitionedKVOutput.java |   7 +-
 .../library/common/shuffle/TestShuffleUtils.java   |   2 +-
 .../common/shuffle/orderedgrouped/TestFetcher.java | 108 +++++++++++----------
 15 files changed, 140 insertions(+), 90 deletions(-)

diff --git 
a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
 
b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
index e358fcc..51224cd 100644
--- 
a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
+++ 
b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
@@ -19,6 +19,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
@@ -38,11 +39,21 @@ class IndexCache {
 
   private final LinkedBlockingQueue<String> queue =
       new LinkedBlockingQueue<String>();
+  private FileSystem fs;
 
   public IndexCache(Configuration conf) {
     this.conf = conf;
     totalMemoryAllowed = 10 * 1024 * 1024;
     LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
+    initLocalFs();
+  }
+
+  private void initLocalFs() {
+    try {
+      this.fs = FileSystem.getLocal(conf).getRaw();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   /**
@@ -114,7 +125,7 @@ class IndexCache {
     LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
     TezSpillRecord tmp = null;
     try {
-      tmp = new TezSpillRecord(indexFileName, conf, expectedIndexOwner);
+      tmp = new TezSpillRecord(indexFileName, fs, expectedIndexOwner);
     } catch (Throwable e) {
       tmp = new TezSpillRecord(0);
       cache.remove(mapId);
diff --git 
a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java
 
b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java
index 1a9cfb2..625f7ab 100644
--- 
a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java
+++ 
b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java
@@ -19,6 +19,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
@@ -38,12 +39,22 @@ class IndexCache {
 
   private final LinkedBlockingQueue<String> queue =
       new LinkedBlockingQueue<String>();
+  private FileSystem fs;
   public static final String INDEX_CACHE_MB = "tez.shuffle.indexcache.mb";
 
   public IndexCache(Configuration conf) {
     this.conf = conf;
     totalMemoryAllowed = conf.getInt(INDEX_CACHE_MB, 10) * 1024 * 1024;
     LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
+    initLocalFs();
+  }
+
+  private void initLocalFs() {
+    try {
+      this.fs = FileSystem.getLocal(conf).getRaw();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   /**
@@ -160,7 +171,7 @@ class IndexCache {
     }
     TezSpillRecord tmp = null;
     try {
-      tmp = new TezSpillRecord(indexFileName, conf, expectedIndexOwner);
+      tmp = new TezSpillRecord(indexFileName, fs, expectedIndexOwner);
     } catch (Throwable e) {
       tmp = new TezSpillRecord(0);
       cache.remove(mapId);
diff --git 
a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
 
b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
index b75a160..a610236 100644
--- 
a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
+++ 
b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
@@ -897,7 +897,8 @@ public class TestShuffleHandler {
     TezSpillRecord tezSpillRecord = new TezSpillRecord(2);
     tezSpillRecord.putIndex(new TezIndexRecord(0, 10, 10), 0);
     tezSpillRecord.putIndex(new TezIndexRecord(10, 10, 10), 1);
-    tezSpillRecord.writeToFile(new Path(indexFile.getAbsolutePath()), conf, 
crc);
+    tezSpillRecord.writeToFile(new Path(indexFile.getAbsolutePath()), conf,
+        FileSystem.getLocal(conf).getRaw(), crc);
   }
 
   @Test
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index fa883e4..f2412e3 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -235,6 +235,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     }
   }
 
+
   // helper method to populate the remaining map
   void populateRemainingMap(List<InputAttemptIdentifier> origlist) {
     if (srcAttemptsRemaining == null) {
@@ -357,7 +358,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
         }
 
         spillRec.putIndex(indexRec, 0);
-        spillRec.writeToFile(tmpIndex, conf);
+        spillRec.writeToFile(tmpIndex, conf, localFs);
         // everything went well so far - rename it
         boolean renamed = localFs.rename(tmpIndex, outputPath
             .suffix(Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING));
@@ -735,7 +736,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     Path indexFile = getShuffleInputFileName(srcAttemptId.getPathComponent(),
         Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
 
-    TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
+    TezSpillRecord spillRecord = new TezSpillRecord(indexFile, localFs);
     idxRecord = spillRecord.getIndex(partition);
     return idxRecord;
   }
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index af9b929..6b8ccb0 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.counters.TezCounter;
@@ -74,6 +75,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
   private final ExceptionReporter exceptionReporter;
   private final int id;
   private final String logIdentifier;
+  private final RawLocalFileSystem localFs;
   private final String localShuffleHost;
   private final int localShufflePort;
   private final String applicationId;
@@ -114,6 +116,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
                                boolean ifileReadAhead, int 
ifileReadAheadLength,
                                CompressionCodec codec,
                                Configuration conf,
+                               RawLocalFileSystem localFs,
                                boolean localDiskFetchEnabled,
                                String localHostname,
                                int shufflePort,
@@ -159,6 +162,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
       this.codec = null;
     }
     this.conf = conf;
+    this.localFs = localFs;
     this.localShuffleHost = localHostname;
     this.localShufflePort = shufflePort;
 
@@ -772,7 +776,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
       throws IOException {
     Path indexFile = getShuffleInputFileName(pathComponent,
         Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
-    TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
+    TezSpillRecord spillRecord = new TezSpillRecord(indexFile, localFs);
     return spillRecord.getIndex(partitionId);
   }
 
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 707f920..ff07e91 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -65,6 +65,8 @@ import 
org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TaskCounter;
@@ -214,6 +216,7 @@ class ShuffleScheduler {
   private final int ifileReadAheadLength;
   private final CompressionCodec codec;
   private final Configuration conf;
+  private final RawLocalFileSystem localFs;
   private final boolean localDiskFetchEnabled;
   private final String localHostname;
   private final int shufflePort;
@@ -263,6 +266,7 @@ class ShuffleScheduler {
                           String srcNameTrimmed) throws IOException {
     this.inputContext = inputContext;
     this.conf = conf;
+    this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
     this.exceptionReporter = exceptionReporter;
     this.allocator = allocator;
     this.mergeManager = mergeManager;
@@ -1464,7 +1468,7 @@ class ShuffleScheduler {
   FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) {
     return new FetcherOrderedGrouped(httpConnectionParams, 
ShuffleScheduler.this, allocator,
         exceptionReporter, jobTokenSecretManager, ifileReadAhead, 
ifileReadAheadLength,
-        codec, conf, localDiskFetchEnabled, localHostname, shufflePort, 
srcNameTrimmed, mapHost,
+        codec, conf, localFs, localDiskFetchEnabled, localHostname, 
shufflePort, srcNameTrimmed, mapHost,
         ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, 
wrongMapErrsCounter,
         connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, 
asyncHttp, sslShuffle,
         verifyDiskChecksum, compositeFetch);
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index a9860ef..6fb1d94 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -98,6 +98,7 @@ public abstract class ExternalSorter {
   protected final Combiner combiner;
   protected final Partitioner partitioner;
   protected final Configuration conf;
+  protected final RawLocalFileSystem localFs;
   protected final FileSystem rfs;
   protected final TezTaskOutput mapOutputFile;
   protected final int partitions;
@@ -167,6 +168,7 @@ public abstract class ExternalSorter {
       long initialMemoryAvailable) throws IOException {
     this.outputContext = outputContext;
     this.conf = conf;
+    this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
     this.partitions = numOutputs;
     reportPartitionStats = ReportPartitionStats.fromString(
         conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 1c14d81..53d087d 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -529,7 +529,7 @@ public class PipelinedSorter extends ExternalSorter {
       }
 
       spillFileIndexPaths.put(numSpills, indexFilename);
-      spillRec.writeToFile(indexFilename, conf);
+      spillRec.writeToFile(indexFilename, conf, localFs);
       //TODO: honor cache limits
       indexCacheList.add(spillRec);
       ++numSpills;
@@ -619,7 +619,7 @@ public class PipelinedSorter extends ExternalSorter {
         mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
             * MAP_OUTPUT_INDEX_RECORD_LENGTH);
       spillFileIndexPaths.put(numSpills, indexFilename);
-      spillRec.writeToFile(indexFilename, conf);
+      spillRec.writeToFile(indexFilename, conf, localFs);
       //TODO: honor cache limits
       indexCacheList.add(spillRec);
       ++numSpills;
@@ -734,7 +734,7 @@ public class PipelinedSorter extends ExternalSorter {
               + "finalIndexFile=" + finalIndexFile + ", filename=" + filename 
+ ", indexFilename=" +
               indexFilename);
         }
-        TezSpillRecord spillRecord = new TezSpillRecord(finalIndexFile, conf);
+        TezSpillRecord spillRecord = new TezSpillRecord(finalIndexFile, 
localFs);
         if (reportPartitionStats()) {
           for (int i = 0; i < spillRecord.size(); i++) {
             partitionStats[i] += spillRecord.getIndex(i).getPartLength();
@@ -830,7 +830,7 @@ public class PipelinedSorter extends ExternalSorter {
       numShuffleChunks.setValue(1); //final merge has happened.
       
fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
 
-      spillRec.writeToFile(finalIndexFile, conf);
+      spillRec.writeToFile(finalIndexFile, conf, localFs);
       finalOut.close();
       for (int i = 0; i < numSpills; i++) {
         Path indexFilename = spillFileIndexPaths.get(i);
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
index 48bd211..e16b7a0 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
@@ -49,20 +49,23 @@ public class TezSpillRecord {
     entries = buf.asLongBuffer();
   }
 
-  public TezSpillRecord(Path indexFileName, Configuration job) throws 
IOException {
-    this(indexFileName, job, null);
+  public TezSpillRecord(Path indexFileName, Configuration conf) throws 
IOException {
+    this(indexFileName, FileSystem.getLocal(conf).getRaw());
   }
 
-  public TezSpillRecord(Path indexFileName, Configuration job, String 
expectedIndexOwner)
+  public TezSpillRecord(Path indexFileName, FileSystem fs) throws IOException {
+    this(indexFileName, fs, null);
+  }
+
+  public TezSpillRecord(Path indexFileName, FileSystem fs, String 
expectedIndexOwner)
     throws IOException {
-    this(indexFileName, job, new PureJavaCrc32(), expectedIndexOwner);
+    this(indexFileName, fs, new PureJavaCrc32(), expectedIndexOwner);
   }
 
-  public TezSpillRecord(Path indexFileName, Configuration job, Checksum crc,
+  public TezSpillRecord(Path indexFileName, FileSystem rfs, Checksum crc,
                      String expectedIndexOwner)
       throws IOException {
 
-    final FileSystem rfs = FileSystem.getLocal(job).getRaw();
     final FSDataInputStream in = rfs.open(indexFileName);
     try {
       final long length = rfs.getFileStatus(indexFileName).getLen();
@@ -117,14 +120,12 @@ public class TezSpillRecord {
   /**
    * Write this spill record to the location provided.
    */
-  public void writeToFile(Path loc, Configuration job)
-      throws IOException {
-    writeToFile(loc, job, new PureJavaCrc32());
+  public void writeToFile(Path loc, Configuration job, FileSystem fs) throws 
IOException {
+    writeToFile(loc, job, fs, new PureJavaCrc32());
   }
 
-  public void writeToFile(Path loc, Configuration job, Checksum crc)
+  public void writeToFile(Path loc, Configuration job, FileSystem rfs, 
Checksum crc)
       throws IOException {
-    final FileSystem rfs = FileSystem.getLocal(job).getRaw();
     CheckedOutputStream chk = null;
     final FSDataOutputStream out = rfs.create(loc);
     try {
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index cb551bd..71e8e3f 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -971,7 +971,7 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
             mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                 * MAP_OUTPUT_INDEX_RECORD_LENGTH);
         spillFileIndexPaths.put(numSpills, indexFilename);
-        spillRec.writeToFile(indexFilename, conf);
+        spillRec.writeToFile(indexFilename, conf, localFs);
       } else {
         indexCacheList.add(spillRec);
         totalIndexCacheMemory +=
@@ -1053,7 +1053,7 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
             mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                 * MAP_OUTPUT_INDEX_RECORD_LENGTH);
         spillFileIndexPaths.put(numSpills, indexFilename);
-         spillRec.writeToFile(indexFilename, conf);
+        spillRec.writeToFile(indexFilename, conf, localFs);
       } else {
         indexCacheList.add(spillRec);
         totalIndexCacheMemory +=
@@ -1193,13 +1193,13 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
       TezSpillRecord spillRecord = indexCacheList.get(i);
       if (spillRecord == null) {
         //File was already written and location is stored in 
spillFileIndexPaths
-        spillRecord = new TezSpillRecord(spillFileIndexPaths.get(i), conf);
+        spillRecord = new TezSpillRecord(spillFileIndexPaths.get(i), localFs);
       } else {
         //Double check if this file has to be written
         if (spillFileIndexPaths.get(i) == null) {
           Path indexPath = mapOutputFile.getSpillIndexFileForWrite(i, 
partitions *
               MAP_OUTPUT_INDEX_RECORD_LENGTH);
-          spillRecord.writeToFile(indexPath, conf);
+          spillRecord.writeToFile(indexPath, conf, localFs);
         }
       }
 
@@ -1228,10 +1228,10 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
         sameVolRename(filename[0], finalOutputFile);
         if (indexCacheList.size() == 0) {
           sameVolRename(spillFileIndexPaths.get(0), finalIndexFile);
-          spillRecord = new TezSpillRecord(finalIndexFile, conf);
+          spillRecord = new TezSpillRecord(finalIndexFile, localFs);
         } else {
           spillRecord = indexCacheList.get(0);
-          spillRecord.writeToFile(finalIndexFile, conf);
+          spillRecord.writeToFile(finalIndexFile, conf, localFs);
         }
       } else {
         List<Event> events = Lists.newLinkedList();
@@ -1239,7 +1239,7 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
         spillRecord = indexCacheList.get(0);
         Path indexPath = mapOutputFile.getSpillIndexFileForWrite(numSpills-1, 
partitions *
             MAP_OUTPUT_INDEX_RECORD_LENGTH);
-        spillRecord.writeToFile(indexPath, conf);
+        spillRecord.writeToFile(indexPath, conf, localFs);
         maybeSendEventForSpill(events, true, spillRecord, 0, true);
         
fileOutputByteCounter.increment(rfs.getFileStatus(spillFilePaths.get(0)).getLen());
         //No need to populate finalIndexFile, finalOutputFile etc when 
finalMerge is disabled
@@ -1256,7 +1256,7 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
     // read in paged indices
     for (int i = indexCacheList.size(); i < numSpills; ++i) {
       Path indexFileName = spillFileIndexPaths.get(i);
-      indexCacheList.add(new TezSpillRecord(indexFileName, conf));
+      indexCacheList.add(new TezSpillRecord(indexFileName, localFs));
     }
 
     //Check if it is needed to do final merge. Or else, exit early.
@@ -1309,7 +1309,7 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
           outputBytesWithOverheadCounter.increment(rawLength);
           sr.putIndex(rec, i);
         }
-        sr.writeToFile(finalIndexFile, conf);
+        sr.writeToFile(finalIndexFile, conf, localFs);
       } finally {
         finalOut.close();
       }
@@ -1392,7 +1392,7 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
         }
       }
       numShuffleChunks.setValue(1); //final merge has happened
-      spillRec.writeToFile(finalIndexFile, conf);
+      spillRec.writeToFile(finalIndexFile, conf, localFs);
       finalOut.close();
       for(int i = 0; i < numSpills; i++) {
         rfs.delete(filename[i],true);
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
index 30d1adb..9bf1517 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
@@ -25,6 +25,8 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.serializer.SerializationFactory;
@@ -48,6 +50,7 @@ public abstract class BaseUnorderedPartitionedKVWriter 
extends KeyValuesWriter {
   
   protected final OutputContext outputContext;
   protected final Configuration conf;
+  protected final RawLocalFileSystem localFs;
   protected final Partitioner partitioner;
   protected final Class keyClass;
   protected final Class valClass;
@@ -105,6 +108,11 @@ public abstract class BaseUnorderedPartitionedKVWriter 
extends KeyValuesWriter {
   public BaseUnorderedPartitionedKVWriter(OutputContext outputContext, 
Configuration conf, int numOutputs) {
     this.outputContext = outputContext;
     this.conf = conf;
+    try {
+      this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
     this.numPartitions = numOutputs;
     
     // k/v serialization
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index c979cc0..9e87098 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -725,7 +725,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
           TezIndexRecord rec = new TezIndexRecord(0, rawLen, compLen);
           TezSpillRecord sr = new TezSpillRecord(1);
           sr.putIndex(rec, 0);
-          sr.writeToFile(finalIndexPath, conf);
+          sr.writeToFile(finalIndexPath, conf, localFs);
 
           BitSet emptyPartitions = new BitSet();
           if (outputRecordsCounter.getValue() == 0) {
@@ -1055,7 +1055,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
       }
       deleteIntermediateSpills();
     }
-    finalSpillRecord.writeToFile(finalIndexPath, conf);
+    finalSpillRecord.writeToFile(finalIndexPath, conf, localFs);
     fileOutputBytesCounter.increment(indexFileSizeEstimate);
     LOG.info(destNameTrimmed + ": " + "Finished final spill after merging : " 
+ numSpills.get() + " spills");
   }
@@ -1148,7 +1148,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
       throws IOException {
     if (spillPathDetails.indexFilePath != null) {
       //write the index record
-      spillRecord.writeToFile(spillPathDetails.indexFilePath, conf);
+      spillRecord.writeToFile(spillPathDetails.indexFilePath, conf, localFs);
     } else {
       //add to cache
       SpillInfo spillInfo = new SpillInfo(spillRecord, 
spillPathDetails.outputFilePath);
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 441f1c2..92d45cf 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -36,6 +36,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
@@ -67,6 +69,7 @@ public class OrderedPartitionedKVOutput extends 
AbstractLogicalOutput {
 
   protected ExternalSorter sorter;
   protected Configuration conf;
+  private RawLocalFileSystem localFs;
   protected MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
   private long startTime;
   private long endTime;
@@ -88,6 +91,8 @@ public class OrderedPartitionedKVOutput extends 
AbstractLogicalOutput {
   public synchronized List<Event> initialize() throws IOException {
     this.startTime = System.nanoTime();
     this.conf = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+    this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
+
     // Initializing this parametr in this conf since it is used in multiple
     // places (wherever LocalDirAllocator is used) - TezTaskOutputFiles,
     // TezMerger, etc.
@@ -205,7 +210,7 @@ public class OrderedPartitionedKVOutput extends 
AbstractLogicalOutput {
       String auxiliaryService = 
conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
           TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
       ShuffleUtils.generateEventOnSpill(eventList, finalMergeEnabled, 
isLastEvent,
-          getContext(), 0, new TezSpillRecord(sorter.getFinalIndexFile(), 
conf),
+          getContext(), 0, new TezSpillRecord(sorter.getFinalIndexFile(), 
localFs),
           getNumPhysicalOutputs(), sendEmptyPartitionDetails, 
getContext().getUniqueIdentifier(),
           sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), 
auxiliaryService, deflater);
     }
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index f61c7e5..cc918fa 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -149,7 +149,7 @@ public class TestShuffleUtils {
       startOffset += partLen;
       spillRecord.putIndex(indexRecord, i);
     }
-    spillRecord.writeToFile(path, conf);
+    spillRecord.writeToFile(path, conf, FileSystem.getLocal(conf).getRaw());
     return path;
   }
 
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index 6d30448..ec0eeee 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -59,7 +59,9 @@ import 
org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.common.security.JobTokenSecretManager;
@@ -126,11 +128,10 @@ public class TestFetcher {
     doReturn(mapsForHost).when(scheduler).getMapsForHost(mapHost);
 
     FetcherOrderedGrouped fetcher =
-        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, 
false, 0,
-            null, conf, false, HOST, PORT, "src vertex", mapHost, 
ioErrsCounter,
-            wrongLengthErrsCounter, badIdErrsCounter,
-            wrongMapErrsCounter, connectionErrsCounter, 
wrongReduceErrsCounter, APP_ID, DAG_ID,
-            false, false, true, false);
+        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, 
false, 0, null, conf,
+            getRawFs(conf), false, HOST, PORT, "src vertex", mapHost, 
ioErrsCounter,
+            wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, 
connectionErrsCounter,
+            wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
 
     fetcher.call();
     verify(scheduler).getMapsForHost(mapHost);
@@ -154,11 +155,10 @@ public class TestFetcher {
     final boolean DISABLE_LOCAL_FETCH = false;
     MapHost mapHost = new MapHost(HOST, PORT, 0, 1);
     FetcherOrderedGrouped fetcher =
-        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, 
false, 0,
-            null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, 
ioErrsCounter,
-            wrongLengthErrsCounter, badIdErrsCounter,
-            wrongMapErrsCounter, connectionErrsCounter, 
wrongReduceErrsCounter, APP_ID, DAG_ID,
-            false, false, true, false);
+        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, 
false, 0, null, conf,
+            getRawFs(conf), ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", 
mapHost, ioErrsCounter,
+            wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, 
connectionErrsCounter,
+            wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
 
     // when local mode is enabled and host and port matches use local fetch
     FetcherOrderedGrouped spyFetcher = spy(fetcher);
@@ -172,11 +172,10 @@ public class TestFetcher {
     // if hostname does not match use http
     mapHost = new MapHost(HOST + "_OTHER", PORT, 0, 1);
     fetcher =
-        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, 
false, 0,
-            null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, 
ioErrsCounter,
-            wrongLengthErrsCounter, badIdErrsCounter,
-            wrongMapErrsCounter, connectionErrsCounter, 
wrongReduceErrsCounter, APP_ID, DAG_ID,
-            false, false, true, false);
+        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, 
false, 0, null, conf,
+            getRawFs(conf), ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", 
mapHost, ioErrsCounter,
+            wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, 
connectionErrsCounter,
+            wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
     spyFetcher = spy(fetcher);
     doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
 
@@ -188,11 +187,10 @@ public class TestFetcher {
     // if port does not match use http
     mapHost = new MapHost(HOST, PORT + 1, 0, 1);
     fetcher =
-        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, 
false, 0,
-            null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, 
ioErrsCounter,
-            wrongLengthErrsCounter, badIdErrsCounter,
-            wrongMapErrsCounter, connectionErrsCounter, 
wrongReduceErrsCounter, APP_ID, DAG_ID,
-            false, false, true, false);
+        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, 
false, 0, null, conf,
+            getRawFs(conf), ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", 
mapHost, ioErrsCounter,
+            wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, 
connectionErrsCounter,
+            wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
     spyFetcher = spy(fetcher);
     doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
 
@@ -203,11 +201,10 @@ public class TestFetcher {
 
     //if local fetch is not enabled
     mapHost = new MapHost(HOST, PORT, 0, 1);
-    fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, 
null, false, 0,
-        null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, 
ioErrsCounter,
-        wrongLengthErrsCounter, badIdErrsCounter,
-        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, 
APP_ID, DAG_ID,
-        false, false, true, false);
+    fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, 
null, false, 0, null,
+        conf, getRawFs(conf), DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", 
mapHost, ioErrsCounter,
+        wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, 
connectionErrsCounter,
+        wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
     spyFetcher = spy(fetcher);
     doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
 
@@ -228,10 +225,10 @@ public class TestFetcher {
     when(inputContext.getSourceVertexName()).thenReturn("");
 
     MapHost host = new MapHost(HOST, PORT, 1, 1);
-    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, 
merger, shuffle, null, false, 0,
-        null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, 
wrongLengthErrsCounter, badIdErrsCounter,
-        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, 
APP_ID, DAG_ID,
-        false, false, true, false);
+    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, 
merger, shuffle,
+        null, false, 0, null, conf, getRawFs(conf), true, HOST, PORT, "src 
vertex", host,
+        ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, 
wrongMapErrsCounter,
+        connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, 
false, true, false);
     FetcherOrderedGrouped spyFetcher = spy(fetcher);
 
 
@@ -338,10 +335,10 @@ public class TestFetcher {
     when(inputContext.getSourceVertexName()).thenReturn("");
 
     MapHost host = new MapHost(HOST, PORT, 1, 1);
-    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, 
merger, shuffle, null, false, 0,
-        null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, 
wrongLengthErrsCounter, badIdErrsCounter,
-        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, 
APP_ID, DAG_ID,
-        false, false, true, false);
+    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, 
merger, shuffle,
+        null, false, 0, null, conf, getRawFs(conf), true, HOST, PORT, "src 
vertex", host,
+        ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, 
wrongMapErrsCounter,
+        connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, 
false, true, false);
     FetcherOrderedGrouped spyFetcher = spy(fetcher);
 
     final List<CompositeInputAttemptIdentifier> srcAttempts = Arrays.asList(
@@ -413,10 +410,10 @@ public class TestFetcher {
     when(inputContext.getSourceVertexName()).thenReturn("");
 
     MapHost host = new MapHost(HOST, PORT, 1, 2);
-    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, 
merger, shuffle, null, false, 0,
-        null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, 
wrongLengthErrsCounter, badIdErrsCounter,
-        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, 
APP_ID, DAG_ID,
-        false, false, true, false);
+    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, 
merger, shuffle,
+        null, false, 0, null, conf, getRawFs(conf), true, HOST, PORT, "src 
vertex", host,
+        ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, 
wrongMapErrsCounter,
+        connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, 
false, true, false);
     FetcherOrderedGrouped spyFetcher = spy(fetcher);
 
 
@@ -587,10 +584,10 @@ public class TestFetcher {
 
     HttpConnectionParams httpConnectionParams = 
ShuffleUtils.getHttpConnectionParams(conf);
     final MapHost host = new MapHost(HOST, PORT, 1, 1);
-    FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, 
scheduler, merger, shuffle, null, false, 0,
-        null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, 
wrongLengthErrsCounter, badIdErrsCounter,
-        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, 
APP_ID, DAG_ID,
-        false, false, true, false);
+    FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, 
scheduler, merger, shuffle,
+        null, false, 0, null, conf, getRawFs(conf), false, HOST, PORT, "src 
vertex", host,
+        ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, 
wrongMapErrsCounter,
+        connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, 
false, true, false);
     final FetcherOrderedGrouped fetcher = spy(mockFetcher);
 
 
@@ -676,12 +673,10 @@ public class TestFetcher {
     HttpConnectionParams httpConnectionParams = 
ShuffleUtils.getHttpConnectionParams(conf);
     final MapHost host = new MapHost(HOST, PORT, 1, 1);
     FetcherOrderedGrouped mockFetcher =
-        new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, 
shuffle, jobMgr,
-            false, 0,
-            null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter,
-            wrongLengthErrsCounter, badIdErrsCounter,
-            wrongMapErrsCounter, connectionErrsCounter, 
wrongReduceErrsCounter, APP_ID, DAG_ID,
-            true, false, true, false);
+        new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, 
shuffle, jobMgr, false,
+            0, null, conf, getRawFs(conf), false, HOST, PORT, "src vertex", 
host, ioErrsCounter,
+            wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, 
connectionErrsCounter,
+            wrongReduceErrsCounter, APP_ID, DAG_ID, true, false, true, false);
     final FetcherOrderedGrouped fetcher = spy(mockFetcher);
     fetcher.remaining = new LinkedHashMap<String, InputAttemptIdentifier>();
     final List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
@@ -743,12 +738,10 @@ public class TestFetcher {
     MergeManager merger = mock(MergeManager.class);
     Shuffle shuffle = mock(Shuffle.class);
     MapHost mapHost = new MapHost(HOST, PORT, 0, 1);
-    FetcherOrderedGrouped fetcher =
-        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, 
false, 0,
-            null, conf, false, HOST, PORT, "src vertex", mapHost, 
ioErrsCounter,
-            wrongLengthErrsCounter, badIdErrsCounter,
-            wrongMapErrsCounter, connectionErrsCounter, 
wrongReduceErrsCounter, APP_ID, DAG_ID,
-            false, false, true, false);
+    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, 
merger, shuffle,
+        null, false, 0, null, conf, getRawFs(conf), false, HOST, PORT, "src 
vertex", mapHost,
+        ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, 
wrongMapErrsCounter,
+        connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, 
false, true, false);
     fetcher.populateRemainingMap(new 
LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts)));
     Assert.assertEquals(expectedSrcAttempts.length, fetcher.remaining.size());
     Iterator<Entry<String, InputAttemptIdentifier>> iterator = 
fetcher.remaining.entrySet().iterator();
@@ -758,4 +751,13 @@ public class TestFetcher {
       Assert.assertTrue(expectedSrcAttempts[count++].toString().compareTo(key) 
== 0);
     }
   }
+
+  private RawLocalFileSystem getRawFs(Configuration conf) {
+    try {
+      return (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      throw new RuntimeException(e);
+    }
+  }
 }

Reply via email to