sidseth commented on a change in pull request #50: TEZ-4088: Create in-memory 
ifile writer for transferring small payload
URL: https://github.com/apache/tez/pull/50#discussion_r333226349
 
 

 ##########
 File path: 
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
 ##########
 @@ -71,6 +73,223 @@
   private static final String INCOMPLETE_READ = "Requested to read %d got %d";
   private static final String REQ_BUFFER_SIZE_TOO_LARGE = "Size of data %d is 
greater than the max allowed of %d";
 
+  /**
+   * IFileWriter which stores data in memory for specified limit, beyond
+   * which it falls back to file based writer. It creates files lazily on
+   * need basis and avoids any disk hit (in cases, where data fits entirely in 
mem).
+   * <p>
+   * This class should not make any changes to IFile logic and should just 
flip streams
+   * from mem to disk on need basis.
+   *
+   * During write, it verifies whether uncompressed payload can fit in memory. 
If so, it would
+   * store in buffer. Otherwise, it falls back to file based writer. Note that 
data stored
+   * internally would be in compressed format (if codec is provided). However, 
for easier
+   * comparison and spill over, uncompressed payload check is done. This is
+   * done intentionally, as it is not possible to know compressed data length
+   * upfront.
+   */
+  public static class FileBackedInMemIFileWriter extends Writer {
+
+    private FileSystem fs;
+    private boolean bufferFull;
+
+    // For lazy creation of file
+    private TezTaskOutput taskOutput;
+    private int totalSize;
+    private int cacheSize;
+
+    @VisibleForTesting
+    private Path outputPath;
+    private CompressionCodec fileCodec;
+    private BoundedByteArrayOutputStream cacheStream;
+
+    private static final int checksumSize = 
IFileOutputStream.getCheckSumSize();
+
+    /**
+     * Note that we do not allow compression in in-mem stream.
+     * When spilled over to file, compression gets enabled.
+     *
+     * @param conf
+     * @param fs
+     * @param taskOutput
+     * @param keyClass
+     * @param valueClass
+     * @param codec
+     * @param writesCounter
+     * @param serializedBytesCounter
+     * @param cacheSize
+     * @throws IOException
+     */
+    public FileBackedInMemIFileWriter(Configuration conf, FileSystem fs,
+        TezTaskOutput taskOutput, Class keyClass, Class valueClass,
+        CompressionCodec codec,
+        TezCounter writesCounter,
+        TezCounter serializedBytesCounter,
+        int cacheSize) throws IOException {
+      super(conf, new FSDataOutputStream(createBoundedBuffer(cacheSize), null),
+          keyClass, valueClass, null, writesCounter, serializedBytesCounter);
+      this.fs = fs;
+      this.cacheStream = (BoundedByteArrayOutputStream) 
this.rawOut.getWrappedStream();
+      this.taskOutput = taskOutput;
+      this.bufferFull = (cacheStream == null);
+      this.cacheSize = (cacheStream == null) ? 0 :
+          Math.max(getBaseCacheSize(), cacheStream.available());
+      this.fileCodec = codec;
+    }
+
+    /**
+     * For basic cache size checks: header + checksum + EOF marker
+     *
+     * @return size of the base cache needed
+     */
+    static int getBaseCacheSize() {
+      return (checksumSize + (2 * WritableUtils.getVIntSize(EOF_MARKER))
+          + HEADER.length);
+    }
+
+    /**
+     * Create in mem stream. In it is too small, adjust it's size
+     *
+     * @param size
+     * @return in memory stream
+     */
+    public static BoundedByteArrayOutputStream createBoundedBuffer(int size) {
+      int resize = Math.max(getBaseCacheSize(), size);
+      return new BoundedByteArrayOutputStream(resize);
+    }
+
+    /**
+     * Flip over from memory to file based writer.
+     *
+     * 1. Content format: HEADER + real data + CHECKSUM. Checksum is for real
+     * data.
+     * 2. Before flipping, close checksum stream, so that checksum is written
+     * out.
+     * 3. Create relevant file based writer.
+     * 4. Write header and then real data.
+     *
+     * @throws IOException
+     */
+    private void resetToFileBasedWriter() throws IOException {
+      // Close out stream, so that data checksums are written.
+      // Buf contents = HEADER + real data + CHECKSUM
+      this.out.close();
+
+      // Get the buffer which contains data in memory
+      BoundedByteArrayOutputStream bout =
+          (BoundedByteArrayOutputStream) this.rawOut.getWrappedStream();
+
+      // Create new file based writer
+      if (outputPath == null) {
+        outputPath = taskOutput.getOutputFileForWrite();
+      }
+      LOG.info("Creating file " + outputPath);
+      FSDataOutputStream newRawOut = fs.create(outputPath);
+      this.rawOut = newRawOut;
+
+      this.checksumOut = new IFileOutputStream(this.rawOut);
+      if (fileCodec != null) {
+        this.compressor = CodecPool.getCompressor(fileCodec);
+        if (this.compressor != null) {
+          this.compressor.reset();
+          this.compressOutput = true;
+          this.compressedOut = fileCodec.createOutputStream(checksumOut, 
compressor);
+          this.out = new FSDataOutputStream(this.compressedOut, null);
+        } else {
+          LOG.warn("Could not obtain compressor from CodecPool");
+          this.out = new FSDataOutputStream(checksumOut, null);
+        }
+      } else {
+        this.out = new FSDataOutputStream(checksumOut, null);
+      }
+
+      // Write header to file
+      headerWritten = false;
+      writeHeader(newRawOut);
+
+      // write real data
+      int sPos = HEADER.length;
+      int len = (bout.size() - checksumSize - HEADER.length);
+      this.out.write(bout.getBuffer(), sPos, len);
+
+      bufferFull = true;
+      bout.reset();
+    }
+
+    @Override
+    public void close() throws IOException {
+      try {
+        super.close();
+      } finally {
+        if (this.rawOut != null) {
+          this.rawOut.close();
+        }
+      }
+    }
+
+    boolean shouldWriteToDisk() {
+      return totalSize >= (cacheSize - (checksumSize +
+          (2 * WritableUtils.getVIntSize(EOF_MARKER))));
+    }
+
+    @Override
+    protected void writeKVPair(byte[] keyData, int keyPos, int keyLength,
+        byte[] valueData, int valPos, int valueLength) throws IOException {
+      if (!bufferFull) {
+        // Compute actual payload size: write RLE marker, length info and then 
entire data.
+        totalSize += ((prevKey == REPEAT_KEY) ? V_END_MARKER_SIZE : 0)
+            + WritableUtils.getVIntSize(keyLength) + keyLength
+            + WritableUtils.getVIntSize(valueLength) + valueLength;
+
+        if (shouldWriteToDisk()) {
+          resetToFileBasedWriter();
+        }
+      }
+      super.writeKVPair(keyData, keyPos, keyLength, valueData, valPos, 
valueLength);
+    }
+
+    @Override
+    protected void writeValue(byte[] data, int offset, int length) throws 
IOException {
+      if (!bufferFull) {
+        totalSize += ((prevKey != REPEAT_KEY) ? RLE_MARKER_SIZE : 0)
+            + WritableUtils.getVIntSize(length) + length;
+
+        if (shouldWriteToDisk()) {
+          resetToFileBasedWriter();
+        }
+      }
+      super.writeValue(data, offset, length);
+    }
+
+    /**
+     * Check if data was flushed to disk.
+     *
+     * @return whether data is flushed to disk ot not
+     */
+    public boolean isDataFlushedToDisk() {
+      return bufferFull;
+    }
+
+    /**
+     * Get cached data if any
+     *
+     * @return if data is not flushed to disk, it returns in-mem contents
+     */
+    public byte[] getData() {
+      if (!isDataFlushedToDisk()) {
+        byte[] buf = new byte[cacheStream.size()];
 
 Review comment:
   Nit: Not sure if this really matters. Could avoid this copy by wrapping in a 
ByteBuffer (another copy is happening anyway when serializing to protobuf)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to