rbalamohan commented on a change in pull request #50: TEZ-4088: Create
in-memory ifile writer for transferring small payload
URL: https://github.com/apache/tez/pull/50#discussion_r333284227
##########
File path:
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
##########
@@ -71,6 +73,223 @@
private static final String INCOMPLETE_READ = "Requested to read %d got %d";
private static final String REQ_BUFFER_SIZE_TOO_LARGE = "Size of data %d is
greater than the max allowed of %d";
+ /**
+ * IFileWriter which stores data in memory for specified limit, beyond
+ * which it falls back to file based writer. It creates files lazily on
+ * need basis and avoids any disk hit (in cases, where data fits entirely in
mem).
+ * <p>
+ * This class should not make any changes to IFile logic and should just
flip streams
+ * from mem to disk on need basis.
+ *
+ * During write, it verifies whether uncompressed payload can fit in memory.
If so, it would
+ * store in buffer. Otherwise, it falls back to file based writer. Note that
data stored
+ * internally would be in compressed format (if codec is provided). However,
for easier
+ * comparison and spill over, uncompressed payload check is done. This is
+ * done intentionally, as it is not possible to know compressed data length
+ * upfront.
+ */
+ public static class FileBackedInMemIFileWriter extends Writer {
+
+ private FileSystem fs;
+ private boolean bufferFull;
+
+ // For lazy creation of file
+ private TezTaskOutput taskOutput;
+ private int totalSize;
+ private int cacheSize;
+
+ @VisibleForTesting
+ private Path outputPath;
+ private CompressionCodec fileCodec;
+ private BoundedByteArrayOutputStream cacheStream;
+
+ private static final int checksumSize =
IFileOutputStream.getCheckSumSize();
+
+ /**
+ * Note that we do not allow compression in in-mem stream.
+ * When spilled over to file, compression gets enabled.
+ *
+ * @param conf
+ * @param fs
+ * @param taskOutput
+ * @param keyClass
+ * @param valueClass
+ * @param codec
+ * @param writesCounter
+ * @param serializedBytesCounter
+ * @param cacheSize
+ * @throws IOException
+ */
+ public FileBackedInMemIFileWriter(Configuration conf, FileSystem fs,
+ TezTaskOutput taskOutput, Class keyClass, Class valueClass,
+ CompressionCodec codec,
+ TezCounter writesCounter,
+ TezCounter serializedBytesCounter,
+ int cacheSize) throws IOException {
+ super(conf, new FSDataOutputStream(createBoundedBuffer(cacheSize), null),
+ keyClass, valueClass, null, writesCounter, serializedBytesCounter);
+ this.fs = fs;
+ this.cacheStream = (BoundedByteArrayOutputStream)
this.rawOut.getWrappedStream();
+ this.taskOutput = taskOutput;
+ this.bufferFull = (cacheStream == null);
+ this.cacheSize = (cacheStream == null) ? 0 :
+ Math.max(getBaseCacheSize(), cacheStream.available());
+ this.fileCodec = codec;
+ }
+
+ /**
+ * For basic cache size checks: header + checksum + EOF marker
+ *
+ * @return size of the base cache needed
+ */
+ static int getBaseCacheSize() {
+ return (checksumSize + (2 * WritableUtils.getVIntSize(EOF_MARKER))
+ + HEADER.length);
+ }
+
+ /**
+ * Create in mem stream. In it is too small, adjust it's size
+ *
+ * @param size
+ * @return in memory stream
+ */
+ public static BoundedByteArrayOutputStream createBoundedBuffer(int size) {
+ int resize = Math.max(getBaseCacheSize(), size);
+ return new BoundedByteArrayOutputStream(resize);
+ }
+
+ /**
+ * Flip over from memory to file based writer.
+ *
+ * 1. Content format: HEADER + real data + CHECKSUM. Checksum is for real
+ * data.
+ * 2. Before flipping, close checksum stream, so that checksum is written
+ * out.
+ * 3. Create relevant file based writer.
+ * 4. Write header and then real data.
+ *
+ * @throws IOException
+ */
+ private void resetToFileBasedWriter() throws IOException {
+ // Close out stream, so that data checksums are written.
+ // Buf contents = HEADER + real data + CHECKSUM
+ this.out.close();
+
+ // Get the buffer which contains data in memory
+ BoundedByteArrayOutputStream bout =
+ (BoundedByteArrayOutputStream) this.rawOut.getWrappedStream();
+
+ // Create new file based writer
+ if (outputPath == null) {
+ outputPath = taskOutput.getOutputFileForWrite();
+ }
+ LOG.info("Creating file " + outputPath);
+ FSDataOutputStream newRawOut = fs.create(outputPath);
Review comment:
Yes, makes sense to create `setupOutputStream()` in `IFile` itself.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services