[ 
https://issues.apache.org/jira/browse/HADOOP-17195?focusedWorklogId=651052&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-651052
 ]

ASF GitHub Bot logged work on HADOOP-17195:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Sep/21 12:16
            Start Date: 15/Sep/21 12:16
    Worklog Time Spent: 10m 
      Work Description: mehakmeet commented on a change in pull request #3406:
URL: https://github.com/apache/hadoop/pull/3406#discussion_r709125850



##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
##########
@@ -230,27 +227,167 @@ public synchronized void write(final byte[] data, final 
int off, final int lengt
     if (hasLease() && isLeaseFreed()) {
       throw new PathIOException(path, ERR_WRITE_WITHOUT_LEASE);
     }
+    DataBlocks.DataBlock block = createBlockIfNeeded();
+    int written = block.write(data, off, length);
+    int remainingCapacity = block.remainingCapacity();
+
+    if (written < length) {
+      // Number of bytes to write is more than the data block capacity,
+      // trigger an upload and then write on the next block.
+      LOG.debug("writing more data than block has capacity -triggering 
upload");
+      uploadCurrentBlock();
+      // tail recursion is mildly expensive, but given buffer sizes must be MB.
+      // it's unlikely to recurse very deeply.
+      this.write(data, off + written, length - written);
+    } else {
+      if (remainingCapacity == 0) {
+        // the whole buffer is done, trigger an upload
+        uploadCurrentBlock();
+      }
+    }
+    incrementWriteOps();
+  }
 
-    int currentOffset = off;
-    int writableBytes = bufferSize - bufferIndex;
-    int numberOfBytesToWrite = length;
-
-    while (numberOfBytesToWrite > 0) {
-      if (writableBytes <= numberOfBytesToWrite) {
-        System.arraycopy(data, currentOffset, buffer, bufferIndex, 
writableBytes);
-        bufferIndex += writableBytes;
-        writeCurrentBufferToService();
-        currentOffset += writableBytes;
-        numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
-      } else {
-        System.arraycopy(data, currentOffset, buffer, bufferIndex, 
numberOfBytesToWrite);
-        bufferIndex += numberOfBytesToWrite;
-        numberOfBytesToWrite = 0;
+  /**
+   * Demand create a destination block.
+   *
+   * @return the active block; null if there isn't one.
+   * @throws IOException on any failure to create
+   */
+  private synchronized DataBlocks.DataBlock createBlockIfNeeded()
+      throws IOException {
+    if (activeBlock == null) {
+      blockCount++;
+      activeBlock = blockFactory
+          .create(blockCount, this.blockSize, outputStreamStatistics);
+    }
+    return activeBlock;
+  }
+
+  /**
+   * Start an asynchronous upload of the current block.
+   *
+   * @throws IOException Problems opening the destination for upload,
+   *                     initializing the upload, or if a previous operation 
has failed.
+   */
+  private synchronized void uploadCurrentBlock() throws IOException {
+    checkState(hasActiveBlock(), "No active block");
+    LOG.debug("Writing block # {}", blockCount);
+    try {
+      uploadBlockAsync(getActiveBlock(), false, false);
+    } finally {
+      // set the block to null, so the next write will create a new block.
+      clearActiveBlock();
+    }
+  }
+
+  /**
+   * Upload a block of data.
+   * This will take the block.
+   *
+   * @param blockToUpload block to upload
+   * @throws IOException     upload failure
+   * @throws PathIOException if too many blocks were written

Review comment:
       Sorry, not valid here, I might have had some similar Javadoc from 
S3ABlockOutputStream, which have this limit for how many blocks can be 
uploaded. Nice catch 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 651052)
    Time Spent: 2h 50m  (was: 2h 40m)

> Intermittent OutOfMemory error while performing hdfs CopyFromLocal to abfs 
> ---------------------------------------------------------------------------
>
>                 Key: HADOOP-17195
>                 URL: https://issues.apache.org/jira/browse/HADOOP-17195
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: fs/azure
>    Affects Versions: 3.3.0
>            Reporter: Mehakmeet Singh
>            Assignee: Mehakmeet Singh
>            Priority: Major
>              Labels: abfsactive, pull-request-available
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> OutOfMemory error due to new ThreadPools being made each time 
> AbfsOutputStream is created. Since threadPool aren't limited a lot of data is 
> loaded in buffer and thus it causes OutOfMemory error.
> Possible fixes:
> - Limit the number of ThreadCounts while performing hdfs copyFromLocal (Using 
> -t property).
> - Reducing OUTPUT_BUFFER_SIZE significantly which would limit the amount of 
> buffer to be loaded in threads.
> - Don't create new ThreadPools each time AbfsOutputStream is created and 
> limit the number of ThreadPools each AbfsOutputStream could create.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to