sidseth commented on a change in pull request #50: TEZ-4088: Create in-memory
ifile writer for transferring small payload
URL: https://github.com/apache/tez/pull/50#discussion_r333217584
##########
File path:
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
##########
@@ -71,6 +73,223 @@
private static final String INCOMPLETE_READ = "Requested to read %d got %d";
private static final String REQ_BUFFER_SIZE_TOO_LARGE = "Size of data %d is
greater than the max allowed of %d";
+ /**
+ * IFileWriter which stores data in memory for specified limit, beyond
+ * which it falls back to file based writer. It creates files lazily on
+ * need basis and avoids any disk hit (in cases, where data fits entirely in
mem).
+ * <p>
+ * This class should not make any changes to IFile logic and should just
flip streams
+ * from mem to disk on need basis.
+ *
+ * During write, it verifies whether uncompressed payload can fit in memory.
If so, it would
+ * store in buffer. Otherwise, it falls back to file based writer. Note that
data stored
+ * internally would be in compressed format (if codec is provided). However,
for easier
+ * comparison and spill over, uncompressed payload check is done. This is
+ * done intentionally, as it is not possible to know compressed data length
+ * upfront.
+ */
+ public static class FileBackedInMemIFileWriter extends Writer {
+
+ private FileSystem fs;
+ private boolean bufferFull;
+
+ // For lazy creation of file
+ private TezTaskOutput taskOutput;
+ private int totalSize;
+ private int cacheSize;
+
+ @VisibleForTesting
+ private Path outputPath;
+ private CompressionCodec fileCodec;
+ private BoundedByteArrayOutputStream cacheStream;
+
+ private static final int checksumSize =
IFileOutputStream.getCheckSumSize();
+
+ /**
+ * Note that we do not allow compression in in-mem stream.
+ * When spilled over to file, compression gets enabled.
+ *
+ * @param conf
+ * @param fs
+ * @param taskOutput
+ * @param keyClass
+ * @param valueClass
+ * @param codec
+ * @param writesCounter
+ * @param serializedBytesCounter
+ * @param cacheSize
+ * @throws IOException
+ */
+ public FileBackedInMemIFileWriter(Configuration conf, FileSystem fs,
+ TezTaskOutput taskOutput, Class keyClass, Class valueClass,
+ CompressionCodec codec,
+ TezCounter writesCounter,
+ TezCounter serializedBytesCounter,
+ int cacheSize) throws IOException {
+ super(conf, new FSDataOutputStream(createBoundedBuffer(cacheSize), null),
+ keyClass, valueClass, null, writesCounter, serializedBytesCounter);
+ this.fs = fs;
+ this.cacheStream = (BoundedByteArrayOutputStream)
this.rawOut.getWrappedStream();
+ this.taskOutput = taskOutput;
+ this.bufferFull = (cacheStream == null);
+ this.cacheSize = (cacheStream == null) ? 0 :
+ Math.max(getBaseCacheSize(), cacheStream.available());
Review comment:
Nit: CacheSize could be replaced by availableDataSize which combines this
calculation along with the one done in shouldWriteToDisk. Will make the code a
little easier to read.
```
return totalSize >= (cacheSize - (checksumSize +
(2 * WritableUtils.getVIntSize(EOF_MARKER))));
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services