mehakmeet commented on a change in pull request #3406:
URL: https://github.com/apache/hadoop/pull/3406#discussion_r709125850



##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
##########
@@ -230,27 +227,167 @@ public synchronized void write(final byte[] data, final 
int off, final int lengt
     if (hasLease() && isLeaseFreed()) {
       throw new PathIOException(path, ERR_WRITE_WITHOUT_LEASE);
     }
+    DataBlocks.DataBlock block = createBlockIfNeeded();
+    int written = block.write(data, off, length);
+    int remainingCapacity = block.remainingCapacity();
+
+    if (written < length) {
+      // Number of bytes to write is more than the data block capacity,
+      // trigger an upload and then write on the next block.
+      LOG.debug("writing more data than block has capacity -triggering 
upload");
+      uploadCurrentBlock();
+      // tail recursion is mildly expensive, but given buffer sizes must be MB.
+      // it's unlikely to recurse very deeply.
+      this.write(data, off + written, length - written);
+    } else {
+      if (remainingCapacity == 0) {
+        // the whole buffer is done, trigger an upload
+        uploadCurrentBlock();
+      }
+    }
+    incrementWriteOps();
+  }
 
-    int currentOffset = off;
-    int writableBytes = bufferSize - bufferIndex;
-    int numberOfBytesToWrite = length;
-
-    while (numberOfBytesToWrite > 0) {
-      if (writableBytes <= numberOfBytesToWrite) {
-        System.arraycopy(data, currentOffset, buffer, bufferIndex, 
writableBytes);
-        bufferIndex += writableBytes;
-        writeCurrentBufferToService();
-        currentOffset += writableBytes;
-        numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
-      } else {
-        System.arraycopy(data, currentOffset, buffer, bufferIndex, 
numberOfBytesToWrite);
-        bufferIndex += numberOfBytesToWrite;
-        numberOfBytesToWrite = 0;
+  /**
+   * Demand create a destination block.
+   *
+   * @return the active block; null if there isn't one.
+   * @throws IOException on any failure to create
+   */
+  private synchronized DataBlocks.DataBlock createBlockIfNeeded()
+      throws IOException {
+    if (activeBlock == null) {
+      blockCount++;
+      activeBlock = blockFactory
+          .create(blockCount, this.blockSize, outputStreamStatistics);
+    }
+    return activeBlock;
+  }
+
+  /**
+   * Start an asynchronous upload of the current block.
+   *
+   * @throws IOException Problems opening the destination for upload,
+   *                     initializing the upload, or if a previous operation 
has failed.
+   */
+  private synchronized void uploadCurrentBlock() throws IOException {
+    checkState(hasActiveBlock(), "No active block");
+    LOG.debug("Writing block # {}", blockCount);
+    try {
+      uploadBlockAsync(getActiveBlock(), false, false);
+    } finally {
+      // set the block to null, so the next write will create a new block.
+      clearActiveBlock();
+    }
+  }
+
+  /**
+   * Upload a block of data.
+   * This will take the block.
+   *
+   * @param blockToUpload block to upload
+   * @throws IOException     upload failure
+   * @throws PathIOException if too many blocks were written

Review comment:
       Sorry, not valid here, I might have had some similar Javadoc from 
S3ABlockOutputStream, which have this limit for how many blocks can be 
uploaded. Nice catch 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to