This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new cde0f80 TEZ-4145: Reduce lock contention in TezSpillRecord (László
Bodor reviewed by Ashutosh Chauhan, Jonathan Turner Eagles, Rajesh Balamohan)
cde0f80 is described below
commit cde0f806063b9023ded2436cbb23b22d541b7024
Author: László Bodor <[email protected]>
AuthorDate: Tue Apr 14 23:02:48 2020 +0200
TEZ-4145: Reduce lock contention in TezSpillRecord (László Bodor reviewed
by Ashutosh Chauhan, Jonathan Turner Eagles, Rajesh Balamohan)
Signed-off-by: Laszlo Bodor <[email protected]>
(cherry picked from commit 7dbec63e1f97eea95ab998e16ffcd592ff6be332)
---
.../org/apache/tez/shufflehandler/IndexCache.java | 13 ++-
.../org/apache/tez/auxservices/IndexCache.java | 13 ++-
.../apache/tez/auxservices/TestShuffleHandler.java | 3 +-
.../runtime/library/common/shuffle/Fetcher.java | 5 +-
.../orderedgrouped/FetcherOrderedGrouped.java | 6 +-
.../shuffle/orderedgrouped/ShuffleScheduler.java | 6 +-
.../library/common/sort/impl/ExternalSorter.java | 2 +
.../library/common/sort/impl/PipelinedSorter.java | 8 +-
.../library/common/sort/impl/TezSpillRecord.java | 23 ++---
.../common/sort/impl/dflt/DefaultSorter.java | 20 ++--
.../writers/BaseUnorderedPartitionedKVWriter.java | 8 ++
.../writers/UnorderedPartitionedKVWriter.java | 6 +-
.../library/output/OrderedPartitionedKVOutput.java | 7 +-
.../library/common/shuffle/TestShuffleUtils.java | 2 +-
.../common/shuffle/orderedgrouped/TestFetcher.java | 108 +++++++++++----------
15 files changed, 140 insertions(+), 90 deletions(-)
diff --git
a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
index e358fcc..51224cd 100644
---
a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
+++
b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
@@ -19,6 +19,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
@@ -38,11 +39,21 @@ class IndexCache {
private final LinkedBlockingQueue<String> queue =
new LinkedBlockingQueue<String>();
+ private FileSystem fs;
public IndexCache(Configuration conf) {
this.conf = conf;
totalMemoryAllowed = 10 * 1024 * 1024;
LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
+ initLocalFs();
+ }
+
+ private void initLocalFs() {
+ try {
+ this.fs = FileSystem.getLocal(conf).getRaw();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
/**
@@ -114,7 +125,7 @@ class IndexCache {
LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
TezSpillRecord tmp = null;
try {
- tmp = new TezSpillRecord(indexFileName, conf, expectedIndexOwner);
+ tmp = new TezSpillRecord(indexFileName, fs, expectedIndexOwner);
} catch (Throwable e) {
tmp = new TezSpillRecord(0);
cache.remove(mapId);
diff --git
a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java
b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java
index 1a9cfb2..625f7ab 100644
---
a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java
+++
b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java
@@ -19,6 +19,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
@@ -38,12 +39,22 @@ class IndexCache {
private final LinkedBlockingQueue<String> queue =
new LinkedBlockingQueue<String>();
+ private FileSystem fs;
public static final String INDEX_CACHE_MB = "tez.shuffle.indexcache.mb";
public IndexCache(Configuration conf) {
this.conf = conf;
totalMemoryAllowed = conf.getInt(INDEX_CACHE_MB, 10) * 1024 * 1024;
LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
+ initLocalFs();
+ }
+
+ private void initLocalFs() {
+ try {
+ this.fs = FileSystem.getLocal(conf).getRaw();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
/**
@@ -160,7 +171,7 @@ class IndexCache {
}
TezSpillRecord tmp = null;
try {
- tmp = new TezSpillRecord(indexFileName, conf, expectedIndexOwner);
+ tmp = new TezSpillRecord(indexFileName, fs, expectedIndexOwner);
} catch (Throwable e) {
tmp = new TezSpillRecord(0);
cache.remove(mapId);
diff --git
a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
index b75a160..a610236 100644
---
a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
+++
b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
@@ -897,7 +897,8 @@ public class TestShuffleHandler {
TezSpillRecord tezSpillRecord = new TezSpillRecord(2);
tezSpillRecord.putIndex(new TezIndexRecord(0, 10, 10), 0);
tezSpillRecord.putIndex(new TezIndexRecord(10, 10, 10), 1);
- tezSpillRecord.writeToFile(new Path(indexFile.getAbsolutePath()), conf,
crc);
+ tezSpillRecord.writeToFile(new Path(indexFile.getAbsolutePath()), conf,
+ FileSystem.getLocal(conf).getRaw(), crc);
}
@Test
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index fa883e4..f2412e3 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -235,6 +235,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
}
}
+
// helper method to populate the remaining map
void populateRemainingMap(List<InputAttemptIdentifier> origlist) {
if (srcAttemptsRemaining == null) {
@@ -357,7 +358,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
}
spillRec.putIndex(indexRec, 0);
- spillRec.writeToFile(tmpIndex, conf);
+ spillRec.writeToFile(tmpIndex, conf, localFs);
// everything went well so far - rename it
boolean renamed = localFs.rename(tmpIndex, outputPath
.suffix(Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING));
@@ -735,7 +736,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
Path indexFile = getShuffleInputFileName(srcAttemptId.getPathComponent(),
Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
- TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
+ TezSpillRecord spillRecord = new TezSpillRecord(indexFile, localFs);
idxRecord = spillRecord.getIndex(partition);
return idxRecord;
}
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index af9b929..6b8ccb0 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.counters.TezCounter;
@@ -74,6 +75,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
private final ExceptionReporter exceptionReporter;
private final int id;
private final String logIdentifier;
+ private final RawLocalFileSystem localFs;
private final String localShuffleHost;
private final int localShufflePort;
private final String applicationId;
@@ -114,6 +116,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
boolean ifileReadAhead, int
ifileReadAheadLength,
CompressionCodec codec,
Configuration conf,
+ RawLocalFileSystem localFs,
boolean localDiskFetchEnabled,
String localHostname,
int shufflePort,
@@ -159,6 +162,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
this.codec = null;
}
this.conf = conf;
+ this.localFs = localFs;
this.localShuffleHost = localHostname;
this.localShufflePort = shufflePort;
@@ -772,7 +776,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
throws IOException {
Path indexFile = getShuffleInputFileName(pathComponent,
Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
- TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
+ TezSpillRecord spillRecord = new TezSpillRecord(indexFile, localFs);
return spillRecord.getIndex(partitionId);
}
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 707f920..ff07e91 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -65,6 +65,8 @@ import
org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.IntWritable;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
@@ -214,6 +216,7 @@ class ShuffleScheduler {
private final int ifileReadAheadLength;
private final CompressionCodec codec;
private final Configuration conf;
+ private final RawLocalFileSystem localFs;
private final boolean localDiskFetchEnabled;
private final String localHostname;
private final int shufflePort;
@@ -263,6 +266,7 @@ class ShuffleScheduler {
String srcNameTrimmed) throws IOException {
this.inputContext = inputContext;
this.conf = conf;
+ this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
this.exceptionReporter = exceptionReporter;
this.allocator = allocator;
this.mergeManager = mergeManager;
@@ -1464,7 +1468,7 @@ class ShuffleScheduler {
FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) {
return new FetcherOrderedGrouped(httpConnectionParams,
ShuffleScheduler.this, allocator,
exceptionReporter, jobTokenSecretManager, ifileReadAhead,
ifileReadAheadLength,
- codec, conf, localDiskFetchEnabled, localHostname, shufflePort,
srcNameTrimmed, mapHost,
+ codec, conf, localFs, localDiskFetchEnabled, localHostname,
shufflePort, srcNameTrimmed, mapHost,
ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
wrongMapErrsCounter,
connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId,
asyncHttp, sslShuffle,
verifyDiskChecksum, compositeFetch);
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index a9860ef..6fb1d94 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -98,6 +98,7 @@ public abstract class ExternalSorter {
protected final Combiner combiner;
protected final Partitioner partitioner;
protected final Configuration conf;
+ protected final RawLocalFileSystem localFs;
protected final FileSystem rfs;
protected final TezTaskOutput mapOutputFile;
protected final int partitions;
@@ -167,6 +168,7 @@ public abstract class ExternalSorter {
long initialMemoryAvailable) throws IOException {
this.outputContext = outputContext;
this.conf = conf;
+ this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
this.partitions = numOutputs;
reportPartitionStats = ReportPartitionStats.fromString(
conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 1c14d81..53d087d 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -529,7 +529,7 @@ public class PipelinedSorter extends ExternalSorter {
}
spillFileIndexPaths.put(numSpills, indexFilename);
- spillRec.writeToFile(indexFilename, conf);
+ spillRec.writeToFile(indexFilename, conf, localFs);
//TODO: honor cache limits
indexCacheList.add(spillRec);
++numSpills;
@@ -619,7 +619,7 @@ public class PipelinedSorter extends ExternalSorter {
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillFileIndexPaths.put(numSpills, indexFilename);
- spillRec.writeToFile(indexFilename, conf);
+ spillRec.writeToFile(indexFilename, conf, localFs);
//TODO: honor cache limits
indexCacheList.add(spillRec);
++numSpills;
@@ -734,7 +734,7 @@ public class PipelinedSorter extends ExternalSorter {
+ "finalIndexFile=" + finalIndexFile + ", filename=" + filename
+ ", indexFilename=" +
indexFilename);
}
- TezSpillRecord spillRecord = new TezSpillRecord(finalIndexFile, conf);
+ TezSpillRecord spillRecord = new TezSpillRecord(finalIndexFile,
localFs);
if (reportPartitionStats()) {
for (int i = 0; i < spillRecord.size(); i++) {
partitionStats[i] += spillRecord.getIndex(i).getPartLength();
@@ -830,7 +830,7 @@ public class PipelinedSorter extends ExternalSorter {
numShuffleChunks.setValue(1); //final merge has happened.
fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
- spillRec.writeToFile(finalIndexFile, conf);
+ spillRec.writeToFile(finalIndexFile, conf, localFs);
finalOut.close();
for (int i = 0; i < numSpills; i++) {
Path indexFilename = spillFileIndexPaths.get(i);
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
index 48bd211..e16b7a0 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
@@ -49,20 +49,23 @@ public class TezSpillRecord {
entries = buf.asLongBuffer();
}
- public TezSpillRecord(Path indexFileName, Configuration job) throws
IOException {
- this(indexFileName, job, null);
+ public TezSpillRecord(Path indexFileName, Configuration conf) throws
IOException {
+ this(indexFileName, FileSystem.getLocal(conf).getRaw());
}
- public TezSpillRecord(Path indexFileName, Configuration job, String
expectedIndexOwner)
+ public TezSpillRecord(Path indexFileName, FileSystem fs) throws IOException {
+ this(indexFileName, fs, null);
+ }
+
+ public TezSpillRecord(Path indexFileName, FileSystem fs, String
expectedIndexOwner)
throws IOException {
- this(indexFileName, job, new PureJavaCrc32(), expectedIndexOwner);
+ this(indexFileName, fs, new PureJavaCrc32(), expectedIndexOwner);
}
- public TezSpillRecord(Path indexFileName, Configuration job, Checksum crc,
+ public TezSpillRecord(Path indexFileName, FileSystem rfs, Checksum crc,
String expectedIndexOwner)
throws IOException {
- final FileSystem rfs = FileSystem.getLocal(job).getRaw();
final FSDataInputStream in = rfs.open(indexFileName);
try {
final long length = rfs.getFileStatus(indexFileName).getLen();
@@ -117,14 +120,12 @@ public class TezSpillRecord {
/**
* Write this spill record to the location provided.
*/
- public void writeToFile(Path loc, Configuration job)
- throws IOException {
- writeToFile(loc, job, new PureJavaCrc32());
+ public void writeToFile(Path loc, Configuration job, FileSystem fs) throws
IOException {
+ writeToFile(loc, job, fs, new PureJavaCrc32());
}
- public void writeToFile(Path loc, Configuration job, Checksum crc)
+ public void writeToFile(Path loc, Configuration job, FileSystem rfs,
Checksum crc)
throws IOException {
- final FileSystem rfs = FileSystem.getLocal(job).getRaw();
CheckedOutputStream chk = null;
final FSDataOutputStream out = rfs.create(loc);
try {
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index cb551bd..71e8e3f 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -971,7 +971,7 @@ public final class DefaultSorter extends ExternalSorter
implements IndexedSortab
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillFileIndexPaths.put(numSpills, indexFilename);
- spillRec.writeToFile(indexFilename, conf);
+ spillRec.writeToFile(indexFilename, conf, localFs);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
@@ -1053,7 +1053,7 @@ public final class DefaultSorter extends ExternalSorter
implements IndexedSortab
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillFileIndexPaths.put(numSpills, indexFilename);
- spillRec.writeToFile(indexFilename, conf);
+ spillRec.writeToFile(indexFilename, conf, localFs);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
@@ -1193,13 +1193,13 @@ public final class DefaultSorter extends ExternalSorter
implements IndexedSortab
TezSpillRecord spillRecord = indexCacheList.get(i);
if (spillRecord == null) {
//File was already written and location is stored in
spillFileIndexPaths
- spillRecord = new TezSpillRecord(spillFileIndexPaths.get(i), conf);
+ spillRecord = new TezSpillRecord(spillFileIndexPaths.get(i), localFs);
} else {
//Double check if this file has to be written
if (spillFileIndexPaths.get(i) == null) {
Path indexPath = mapOutputFile.getSpillIndexFileForWrite(i,
partitions *
MAP_OUTPUT_INDEX_RECORD_LENGTH);
- spillRecord.writeToFile(indexPath, conf);
+ spillRecord.writeToFile(indexPath, conf, localFs);
}
}
@@ -1228,10 +1228,10 @@ public final class DefaultSorter extends ExternalSorter
implements IndexedSortab
sameVolRename(filename[0], finalOutputFile);
if (indexCacheList.size() == 0) {
sameVolRename(spillFileIndexPaths.get(0), finalIndexFile);
- spillRecord = new TezSpillRecord(finalIndexFile, conf);
+ spillRecord = new TezSpillRecord(finalIndexFile, localFs);
} else {
spillRecord = indexCacheList.get(0);
- spillRecord.writeToFile(finalIndexFile, conf);
+ spillRecord.writeToFile(finalIndexFile, conf, localFs);
}
} else {
List<Event> events = Lists.newLinkedList();
@@ -1239,7 +1239,7 @@ public final class DefaultSorter extends ExternalSorter
implements IndexedSortab
spillRecord = indexCacheList.get(0);
Path indexPath = mapOutputFile.getSpillIndexFileForWrite(numSpills-1,
partitions *
MAP_OUTPUT_INDEX_RECORD_LENGTH);
- spillRecord.writeToFile(indexPath, conf);
+ spillRecord.writeToFile(indexPath, conf, localFs);
maybeSendEventForSpill(events, true, spillRecord, 0, true);
fileOutputByteCounter.increment(rfs.getFileStatus(spillFilePaths.get(0)).getLen());
//No need to populate finalIndexFile, finalOutputFile etc when
finalMerge is disabled
@@ -1256,7 +1256,7 @@ public final class DefaultSorter extends ExternalSorter
implements IndexedSortab
// read in paged indices
for (int i = indexCacheList.size(); i < numSpills; ++i) {
Path indexFileName = spillFileIndexPaths.get(i);
- indexCacheList.add(new TezSpillRecord(indexFileName, conf));
+ indexCacheList.add(new TezSpillRecord(indexFileName, localFs));
}
//Check if it is needed to do final merge. Or else, exit early.
@@ -1309,7 +1309,7 @@ public final class DefaultSorter extends ExternalSorter
implements IndexedSortab
outputBytesWithOverheadCounter.increment(rawLength);
sr.putIndex(rec, i);
}
- sr.writeToFile(finalIndexFile, conf);
+ sr.writeToFile(finalIndexFile, conf, localFs);
} finally {
finalOut.close();
}
@@ -1392,7 +1392,7 @@ public final class DefaultSorter extends ExternalSorter
implements IndexedSortab
}
}
numShuffleChunks.setValue(1); //final merge has happened
- spillRec.writeToFile(finalIndexFile, conf);
+ spillRec.writeToFile(finalIndexFile, conf, localFs);
finalOut.close();
for(int i = 0; i < numSpills; i++) {
rfs.delete(filename[i],true);
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
index 30d1adb..9bf1517 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
@@ -25,6 +25,8 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.serializer.SerializationFactory;
@@ -48,6 +50,7 @@ public abstract class BaseUnorderedPartitionedKVWriter
extends KeyValuesWriter {
protected final OutputContext outputContext;
protected final Configuration conf;
+ protected final RawLocalFileSystem localFs;
protected final Partitioner partitioner;
protected final Class keyClass;
protected final Class valClass;
@@ -105,6 +108,11 @@ public abstract class BaseUnorderedPartitionedKVWriter
extends KeyValuesWriter {
public BaseUnorderedPartitionedKVWriter(OutputContext outputContext,
Configuration conf, int numOutputs) {
this.outputContext = outputContext;
this.conf = conf;
+ try {
+ this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
this.numPartitions = numOutputs;
// k/v serialization
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index c979cc0..9e87098 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -725,7 +725,7 @@ public class UnorderedPartitionedKVWriter extends
BaseUnorderedPartitionedKVWrit
TezIndexRecord rec = new TezIndexRecord(0, rawLen, compLen);
TezSpillRecord sr = new TezSpillRecord(1);
sr.putIndex(rec, 0);
- sr.writeToFile(finalIndexPath, conf);
+ sr.writeToFile(finalIndexPath, conf, localFs);
BitSet emptyPartitions = new BitSet();
if (outputRecordsCounter.getValue() == 0) {
@@ -1055,7 +1055,7 @@ public class UnorderedPartitionedKVWriter extends
BaseUnorderedPartitionedKVWrit
}
deleteIntermediateSpills();
}
- finalSpillRecord.writeToFile(finalIndexPath, conf);
+ finalSpillRecord.writeToFile(finalIndexPath, conf, localFs);
fileOutputBytesCounter.increment(indexFileSizeEstimate);
LOG.info(destNameTrimmed + ": " + "Finished final spill after merging : "
+ numSpills.get() + " spills");
}
@@ -1148,7 +1148,7 @@ public class UnorderedPartitionedKVWriter extends
BaseUnorderedPartitionedKVWrit
throws IOException {
if (spillPathDetails.indexFilePath != null) {
//write the index record
- spillRecord.writeToFile(spillPathDetails.indexFilePath, conf);
+ spillRecord.writeToFile(spillPathDetails.indexFilePath, conf, localFs);
} else {
//add to cache
SpillInfo spillInfo = new SpillInfo(spillRecord,
spillPathDetails.outputFilePath);
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 441f1c2..92d45cf 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -36,6 +36,8 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtils;
@@ -67,6 +69,7 @@ public class OrderedPartitionedKVOutput extends
AbstractLogicalOutput {
protected ExternalSorter sorter;
protected Configuration conf;
+ private RawLocalFileSystem localFs;
protected MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
private long startTime;
private long endTime;
@@ -88,6 +91,8 @@ public class OrderedPartitionedKVOutput extends
AbstractLogicalOutput {
public synchronized List<Event> initialize() throws IOException {
this.startTime = System.nanoTime();
this.conf =
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+ this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
+
// Initializing this parametr in this conf since it is used in multiple
// places (wherever LocalDirAllocator is used) - TezTaskOutputFiles,
// TezMerger, etc.
@@ -205,7 +210,7 @@ public class OrderedPartitionedKVOutput extends
AbstractLogicalOutput {
String auxiliaryService =
conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
ShuffleUtils.generateEventOnSpill(eventList, finalMergeEnabled,
isLastEvent,
- getContext(), 0, new TezSpillRecord(sorter.getFinalIndexFile(),
conf),
+ getContext(), 0, new TezSpillRecord(sorter.getFinalIndexFile(),
localFs),
getNumPhysicalOutputs(), sendEmptyPartitionDetails,
getContext().getUniqueIdentifier(),
sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(),
auxiliaryService, deflater);
}
diff --git
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index f61c7e5..cc918fa 100644
---
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -149,7 +149,7 @@ public class TestShuffleUtils {
startOffset += partLen;
spillRecord.putIndex(indexRecord, i);
}
- spillRecord.writeToFile(path, conf);
+ spillRecord.writeToFile(path, conf, FileSystem.getLocal(conf).getRaw());
return path;
}
diff --git
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index 6d30448..ec0eeee 100644
---
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -59,7 +59,9 @@ import
org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.JobTokenSecretManager;
@@ -126,11 +128,10 @@ public class TestFetcher {
doReturn(mapsForHost).when(scheduler).getMapsForHost(mapHost);
FetcherOrderedGrouped fetcher =
- new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null,
false, 0,
- null, conf, false, HOST, PORT, "src vertex", mapHost,
ioErrsCounter,
- wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter,
wrongReduceErrsCounter, APP_ID, DAG_ID,
- false, false, true, false);
+ new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null,
false, 0, null, conf,
+ getRawFs(conf), false, HOST, PORT, "src vertex", mapHost,
ioErrsCounter,
+ wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
connectionErrsCounter,
+ wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
fetcher.call();
verify(scheduler).getMapsForHost(mapHost);
@@ -154,11 +155,10 @@ public class TestFetcher {
final boolean DISABLE_LOCAL_FETCH = false;
MapHost mapHost = new MapHost(HOST, PORT, 0, 1);
FetcherOrderedGrouped fetcher =
- new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null,
false, 0,
- null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost,
ioErrsCounter,
- wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter,
wrongReduceErrsCounter, APP_ID, DAG_ID,
- false, false, true, false);
+ new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null,
false, 0, null, conf,
+ getRawFs(conf), ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex",
mapHost, ioErrsCounter,
+ wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
connectionErrsCounter,
+ wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
// when local mode is enabled and host and port matches use local fetch
FetcherOrderedGrouped spyFetcher = spy(fetcher);
@@ -172,11 +172,10 @@ public class TestFetcher {
// if hostname does not match use http
mapHost = new MapHost(HOST + "_OTHER", PORT, 0, 1);
fetcher =
- new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null,
false, 0,
- null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost,
ioErrsCounter,
- wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter,
wrongReduceErrsCounter, APP_ID, DAG_ID,
- false, false, true, false);
+ new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null,
false, 0, null, conf,
+ getRawFs(conf), ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex",
mapHost, ioErrsCounter,
+ wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
connectionErrsCounter,
+ wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
spyFetcher = spy(fetcher);
doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
@@ -188,11 +187,10 @@ public class TestFetcher {
// if port does not match use http
mapHost = new MapHost(HOST, PORT + 1, 0, 1);
fetcher =
- new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null,
false, 0,
- null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost,
ioErrsCounter,
- wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter,
wrongReduceErrsCounter, APP_ID, DAG_ID,
- false, false, true, false);
+ new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null,
false, 0, null, conf,
+ getRawFs(conf), ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex",
mapHost, ioErrsCounter,
+ wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
connectionErrsCounter,
+ wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
spyFetcher = spy(fetcher);
doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
@@ -203,11 +201,10 @@ public class TestFetcher {
//if local fetch is not enabled
mapHost = new MapHost(HOST, PORT, 0, 1);
- fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle,
null, false, 0,
- null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost,
ioErrsCounter,
- wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter,
APP_ID, DAG_ID,
- false, false, true, false);
+ fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle,
null, false, 0, null,
+ conf, getRawFs(conf), DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex",
mapHost, ioErrsCounter,
+ wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
connectionErrsCounter,
+ wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
spyFetcher = spy(fetcher);
doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
@@ -228,10 +225,10 @@ public class TestFetcher {
when(inputContext.getSourceVertexName()).thenReturn("");
MapHost host = new MapHost(HOST, PORT, 1, 1);
- FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler,
merger, shuffle, null, false, 0,
- null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter,
APP_ID, DAG_ID,
- false, false, true, false);
+ FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler,
merger, shuffle,
+ null, false, 0, null, conf, getRawFs(conf), true, HOST, PORT, "src
vertex", host,
+ ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
wrongMapErrsCounter,
+ connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false,
false, true, false);
FetcherOrderedGrouped spyFetcher = spy(fetcher);
@@ -338,10 +335,10 @@ public class TestFetcher {
when(inputContext.getSourceVertexName()).thenReturn("");
MapHost host = new MapHost(HOST, PORT, 1, 1);
- FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler,
merger, shuffle, null, false, 0,
- null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter,
APP_ID, DAG_ID,
- false, false, true, false);
+ FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler,
merger, shuffle,
+ null, false, 0, null, conf, getRawFs(conf), true, HOST, PORT, "src
vertex", host,
+ ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
wrongMapErrsCounter,
+ connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false,
false, true, false);
FetcherOrderedGrouped spyFetcher = spy(fetcher);
final List<CompositeInputAttemptIdentifier> srcAttempts = Arrays.asList(
@@ -413,10 +410,10 @@ public class TestFetcher {
when(inputContext.getSourceVertexName()).thenReturn("");
MapHost host = new MapHost(HOST, PORT, 1, 2);
- FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler,
merger, shuffle, null, false, 0,
- null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter,
APP_ID, DAG_ID,
- false, false, true, false);
+ FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler,
merger, shuffle,
+ null, false, 0, null, conf, getRawFs(conf), true, HOST, PORT, "src
vertex", host,
+ ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
wrongMapErrsCounter,
+ connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false,
false, true, false);
FetcherOrderedGrouped spyFetcher = spy(fetcher);
@@ -587,10 +584,10 @@ public class TestFetcher {
HttpConnectionParams httpConnectionParams =
ShuffleUtils.getHttpConnectionParams(conf);
final MapHost host = new MapHost(HOST, PORT, 1, 1);
- FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null,
scheduler, merger, shuffle, null, false, 0,
- null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter,
APP_ID, DAG_ID,
- false, false, true, false);
+ FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null,
scheduler, merger, shuffle,
+ null, false, 0, null, conf, getRawFs(conf), false, HOST, PORT, "src
vertex", host,
+ ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
wrongMapErrsCounter,
+ connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false,
false, true, false);
final FetcherOrderedGrouped fetcher = spy(mockFetcher);
@@ -676,12 +673,10 @@ public class TestFetcher {
HttpConnectionParams httpConnectionParams =
ShuffleUtils.getHttpConnectionParams(conf);
final MapHost host = new MapHost(HOST, PORT, 1, 1);
FetcherOrderedGrouped mockFetcher =
- new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger,
shuffle, jobMgr,
- false, 0,
- null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter,
- wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter,
wrongReduceErrsCounter, APP_ID, DAG_ID,
- true, false, true, false);
+ new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger,
shuffle, jobMgr, false,
+ 0, null, conf, getRawFs(conf), false, HOST, PORT, "src vertex",
host, ioErrsCounter,
+ wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
connectionErrsCounter,
+ wrongReduceErrsCounter, APP_ID, DAG_ID, true, false, true, false);
final FetcherOrderedGrouped fetcher = spy(mockFetcher);
fetcher.remaining = new LinkedHashMap<String, InputAttemptIdentifier>();
final List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
@@ -743,12 +738,10 @@ public class TestFetcher {
MergeManager merger = mock(MergeManager.class);
Shuffle shuffle = mock(Shuffle.class);
MapHost mapHost = new MapHost(HOST, PORT, 0, 1);
- FetcherOrderedGrouped fetcher =
- new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null,
false, 0,
- null, conf, false, HOST, PORT, "src vertex", mapHost,
ioErrsCounter,
- wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter,
wrongReduceErrsCounter, APP_ID, DAG_ID,
- false, false, true, false);
+ FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler,
merger, shuffle,
+ null, false, 0, null, conf, getRawFs(conf), false, HOST, PORT, "src
vertex", mapHost,
+ ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
wrongMapErrsCounter,
+ connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false,
false, true, false);
fetcher.populateRemainingMap(new
LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts)));
Assert.assertEquals(expectedSrcAttempts.length, fetcher.remaining.size());
Iterator<Entry<String, InputAttemptIdentifier>> iterator =
fetcher.remaining.entrySet().iterator();
@@ -758,4 +751,13 @@ public class TestFetcher {
Assert.assertTrue(expectedSrcAttempts[count++].toString().compareTo(key)
== 0);
}
}
+
+ private RawLocalFileSystem getRawFs(Configuration conf) {
+ try {
+ return (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ throw new RuntimeException(e);
+ }
+ }
}