mukund-thakur commented on a change in pull request #3406:
URL: https://github.com/apache/hadoop/pull/3406#discussion_r708898637



##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
##########
@@ -625,8 +644,11 @@ private AbfsRestOperation 
conditionalCreateOverwriteFile(final String relativePa
     return op;
   }
 
-  private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean 
isAppendBlob,
-      AbfsLease lease) {
+  private AbfsOutputStreamContext populateAbfsOutputStreamContext(

Review comment:
       Follow one or 2 variable per line. 

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java
##########
@@ -0,0 +1,1064 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.store;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.DirectBufferPool;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Upload;
+import static 
org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
+public class DataBlocks {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DataBlocks.class);
+
+  private DataBlocks() {
+  }
+
+  /**
+   * Validate args to a write command. These are the same validation checks
+   * expected for any implementation of {@code OutputStream.write()}.
+   *
+   * @param b   byte array containing data.
+   * @param off offset in array where to start.
+   * @param len number of bytes to be written.
+   * @throws NullPointerException      for a null buffer
+   * @throws IndexOutOfBoundsException if indices are out of range
+   */
+  public static void validateWriteArgs(byte[] b, int off, int len)
+      throws IOException {
+    Preconditions.checkNotNull(b);
+    if ((off < 0) || (off > b.length) || (len < 0) ||
+        ((off + len) > b.length) || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException(
+          "write (b[" + b.length + "], " + off + ", " + len + ')');
+    }
+  }
+
+  /**
+   * Create a factory.
+   *
+   * @param KeyToBufferDir Key to buffer directory config for a FS.
+   * @param configuration  factory configurations.
+   * @param name           factory name -the option from {@link 
CommonConfigurationKeys}.
+   * @return the factory, ready to be initialized.
+   * @throws IllegalArgumentException if the name is unknown.
+   */
+  public static BlockFactory createFactory(String KeyToBufferDir,
+      Configuration configuration,
+      String name) {
+    LOG.debug("Creating DataFactory of type : {}", name);
+    switch (name) {
+    case CommonConfigurationKeys.DATA_BLOCKS_BUFFER_ARRAY:
+      return new ArrayBlockFactory(KeyToBufferDir, configuration);
+    case CommonConfigurationKeys.DATA_BLOCKS_BUFFER_DISK:
+      return new DiskBlockFactory(KeyToBufferDir, configuration);
+    case CommonConfigurationKeys.DATA_BLOCKS_BYTEBUFFER:
+      return new ByteBufferBlockFactory(KeyToBufferDir, configuration);
+    default:
+      throw new IllegalArgumentException("Unsupported block buffer" +
+          " \"" + name + '"');
+    }
+  }
+
+  /**
+   * The output information for an upload.
+   * It can be one of a file or an input stream.
+   * When closed, any stream is closed. Any source file is untouched.
+   */
+  public static final class BlockUploadData implements Closeable {
+    private final File file;
+    private InputStream uploadStream;
+    private byte[] byteArray;
+    private boolean isClosed;
+
+    public BlockUploadData(byte[] byteArray) {
+      this.file = null;
+      this.uploadStream = null;
+
+      this.byteArray = requireNonNull(byteArray);
+    }
+
+    /**
+     * File constructor; input stream will be null.
+     *
+     * @param file file to upload
+     */
+    BlockUploadData(File file) {
+      Preconditions.checkArgument(file.exists(), "No file: " + file);
+      this.file = file;
+      this.uploadStream = null;
+      this.byteArray = null;
+    }
+
+    /**
+     * Stream constructor, file field will be null.
+     *
+     * @param uploadStream stream to upload.
+     */
+    BlockUploadData(InputStream uploadStream) {
+      requireNonNull(uploadStream, "rawUploadStream");
+      this.uploadStream = uploadStream;
+      this.file = null;
+      this.byteArray = null;
+    }
+
+    /**
+     * Predicate: does this instance contain a file reference.
+     *
+     * @return true if there is a file.
+     */
+    boolean hasFile() {
+      return file != null;
+    }
+
+    /**
+     * Get the file, if there is one.
+     *
+     * @return the file for uploading, or null.
+     */
+    File getFile() {
+      return file;
+    }
+
+    /**
+     * Get the raw upload stream, if the object was
+     * created with one.
+     *
+     * @return the upload stream or null.
+     */
+    InputStream getUploadStream() {
+      return uploadStream;
+    }
+
+    /**
+     * A Method to return the uploadBlock into byteArray.
+     *
+     * @return byte[] after converting the uploadBlock.
+     * @throws IOException throw if an exception is caught while reading
+     *                     File/InputStream or closing InputStream.
+     */
+    public byte[] toByteArray() throws IOException {
+      Preconditions.checkState(!isClosed, "Block is closed");
+      if (byteArray != null) {
+        return byteArray;
+      }
+      if (file != null) {
+        return FileUtils.readFileToByteArray(file);
+      }
+      byteArray = IOUtils.toByteArray(uploadStream);
+      uploadStream.close();

Review comment:
       close using IOUtils.close()?

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java
##########
@@ -0,0 +1,1064 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.store;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.DirectBufferPool;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Upload;
+import static 
org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
+public class DataBlocks {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DataBlocks.class);
+
+  private DataBlocks() {
+  }
+
+  /**
+   * Validate args to a write command. These are the same validation checks
+   * expected for any implementation of {@code OutputStream.write()}.
+   *
+   * @param b   byte array containing data.
+   * @param off offset in array where to start.
+   * @param len number of bytes to be written.
+   * @throws NullPointerException      for a null buffer
+   * @throws IndexOutOfBoundsException if indices are out of range
+   */
+  public static void validateWriteArgs(byte[] b, int off, int len)
+      throws IOException {
+    Preconditions.checkNotNull(b);
+    if ((off < 0) || (off > b.length) || (len < 0) ||
+        ((off + len) > b.length) || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException(
+          "write (b[" + b.length + "], " + off + ", " + len + ')');
+    }
+  }
+
+  /**
+   * Create a factory.
+   *
+   * @param KeyToBufferDir Key to buffer directory config for a FS.
+   * @param configuration  factory configurations.
+   * @param name           factory name -the option from {@link 
CommonConfigurationKeys}.
+   * @return the factory, ready to be initialized.
+   * @throws IllegalArgumentException if the name is unknown.
+   */
+  public static BlockFactory createFactory(String KeyToBufferDir,
+      Configuration configuration,
+      String name) {
+    LOG.debug("Creating DataFactory of type : {}", name);
+    switch (name) {
+    case CommonConfigurationKeys.DATA_BLOCKS_BUFFER_ARRAY:
+      return new ArrayBlockFactory(KeyToBufferDir, configuration);
+    case CommonConfigurationKeys.DATA_BLOCKS_BUFFER_DISK:
+      return new DiskBlockFactory(KeyToBufferDir, configuration);
+    case CommonConfigurationKeys.DATA_BLOCKS_BYTEBUFFER:
+      return new ByteBufferBlockFactory(KeyToBufferDir, configuration);
+    default:
+      throw new IllegalArgumentException("Unsupported block buffer" +
+          " \"" + name + '"');
+    }
+  }
+
+  /**
+   * The output information for an upload.
+   * It can be one of a file or an input stream.
+   * When closed, any stream is closed. Any source file is untouched.
+   */
+  public static final class BlockUploadData implements Closeable {
+    private final File file;
+    private InputStream uploadStream;
+    private byte[] byteArray;
+    private boolean isClosed;
+
+    public BlockUploadData(byte[] byteArray) {
+      this.file = null;
+      this.uploadStream = null;
+
+      this.byteArray = requireNonNull(byteArray);
+    }
+
+    /**
+     * File constructor; input stream will be null.
+     *
+     * @param file file to upload
+     */
+    BlockUploadData(File file) {
+      Preconditions.checkArgument(file.exists(), "No file: " + file);
+      this.file = file;
+      this.uploadStream = null;
+      this.byteArray = null;
+    }
+
+    /**
+     * Stream constructor, file field will be null.
+     *
+     * @param uploadStream stream to upload.
+     */
+    BlockUploadData(InputStream uploadStream) {
+      requireNonNull(uploadStream, "rawUploadStream");
+      this.uploadStream = uploadStream;
+      this.file = null;
+      this.byteArray = null;
+    }
+
+    /**
+     * Predicate: does this instance contain a file reference.
+     *
+     * @return true if there is a file.
+     */
+    boolean hasFile() {
+      return file != null;
+    }
+
+    /**
+     * Get the file, if there is one.
+     *
+     * @return the file for uploading, or null.
+     */
+    File getFile() {
+      return file;
+    }
+
+    /**
+     * Get the raw upload stream, if the object was
+     * created with one.
+     *
+     * @return the upload stream or null.
+     */
+    InputStream getUploadStream() {
+      return uploadStream;
+    }
+
+    /**
+     * A Method to return the uploadBlock into byteArray.
+     *
+     * @return byte[] after converting the uploadBlock.
+     * @throws IOException throw if an exception is caught while reading
+     *                     File/InputStream or closing InputStream.
+     */
+    public byte[] toByteArray() throws IOException {
+      Preconditions.checkState(!isClosed, "Block is closed");
+      if (byteArray != null) {
+        return byteArray;
+      }
+      if (file != null) {
+        return FileUtils.readFileToByteArray(file);
+      }
+      byteArray = IOUtils.toByteArray(uploadStream);
+      uploadStream.close();

Review comment:
       check for uploadStream not null?

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
##########
@@ -230,27 +227,167 @@ public synchronized void write(final byte[] data, final 
int off, final int lengt
     if (hasLease() && isLeaseFreed()) {
       throw new PathIOException(path, ERR_WRITE_WITHOUT_LEASE);
     }
+    DataBlocks.DataBlock block = createBlockIfNeeded();
+    int written = block.write(data, off, length);
+    int remainingCapacity = block.remainingCapacity();
+
+    if (written < length) {
+      // Number of bytes to write is more than the data block capacity,
+      // trigger an upload and then write on the next block.
+      LOG.debug("writing more data than block has capacity -triggering 
upload");
+      uploadCurrentBlock();
+      // tail recursion is mildly expensive, but given buffer sizes must be MB.
+      // it's unlikely to recurse very deeply.
+      this.write(data, off + written, length - written);
+    } else {
+      if (remainingCapacity == 0) {
+        // the whole buffer is done, trigger an upload
+        uploadCurrentBlock();
+      }
+    }
+    incrementWriteOps();
+  }
 
-    int currentOffset = off;
-    int writableBytes = bufferSize - bufferIndex;
-    int numberOfBytesToWrite = length;
-
-    while (numberOfBytesToWrite > 0) {
-      if (writableBytes <= numberOfBytesToWrite) {
-        System.arraycopy(data, currentOffset, buffer, bufferIndex, 
writableBytes);
-        bufferIndex += writableBytes;
-        writeCurrentBufferToService();
-        currentOffset += writableBytes;
-        numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
-      } else {
-        System.arraycopy(data, currentOffset, buffer, bufferIndex, 
numberOfBytesToWrite);
-        bufferIndex += numberOfBytesToWrite;
-        numberOfBytesToWrite = 0;
+  /**
+   * Demand create a destination block.
+   *
+   * @return the active block; null if there isn't one.
+   * @throws IOException on any failure to create
+   */
+  private synchronized DataBlocks.DataBlock createBlockIfNeeded()
+      throws IOException {
+    if (activeBlock == null) {
+      blockCount++;
+      activeBlock = blockFactory
+          .create(blockCount, this.blockSize, outputStreamStatistics);
+    }
+    return activeBlock;
+  }
+
+  /**
+   * Start an asynchronous upload of the current block.
+   *
+   * @throws IOException Problems opening the destination for upload,
+   *                     initializing the upload, or if a previous operation 
has failed.
+   */
+  private synchronized void uploadCurrentBlock() throws IOException {
+    checkState(hasActiveBlock(), "No active block");
+    LOG.debug("Writing block # {}", blockCount);
+    try {
+      uploadBlockAsync(getActiveBlock(), false, false);
+    } finally {
+      // set the block to null, so the next write will create a new block.
+      clearActiveBlock();
+    }
+  }
+
+  /**
+   * Upload a block of data.
+   * This will take the block.
+   *
+   * @param blockToUpload block to upload
+   * @throws IOException     upload failure
+   * @throws PathIOException if too many blocks were written
+   */
+  private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload,
+      boolean isFlush, boolean isClose)
+      throws IOException {
+    if (this.isAppendBlob) {
+      writeAppendBlobCurrentBufferToService();
+      return;
+    }
+    if (!blockToUpload.hasData()) {
+      return;
+    }
+    numOfAppendsToServerSinceLastFlush++;
+
+    final int bytesLength = blockToUpload.dataSize();
+    final long offset = position;
+    position += bytesLength;
+    outputStreamStatistics.bytesToUpload(bytesLength);
+    outputStreamStatistics.writeCurrentBuffer();
+    DataBlocks.BlockUploadData blockUploadData = blockToUpload.startUpload();
+    final Future<Void> job =
+        executorService.submit(() -> {
+          AbfsPerfTracker tracker =
+              client.getAbfsPerfTracker();
+          try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
+              "writeCurrentBufferToService", "append")) {
+            AppendRequestParameters.Mode
+                mode = APPEND_MODE;
+            if (isFlush & isClose) {
+              mode = FLUSH_CLOSE_MODE;
+            } else if (isFlush) {
+              mode = FLUSH_MODE;
+            }
+            AppendRequestParameters reqParams = new AppendRequestParameters(

Review comment:
       Could you write some javadoc under AppendRequestParameters. I know it is 
old code but will be helpful for future. Thanks 

##########
File path: 
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
##########
@@ -58,12 +68,25 @@
   private final String accountKey1 = globalKey + "." + accountName1;
   private final String accountValue1 = "one";
 
-  private AbfsOutputStreamContext populateAbfsOutputStreamContext(int 
writeBufferSize,
-            boolean isFlushEnabled,
-            boolean disableOutputStreamFlush,
-            boolean isAppendBlob) throws IOException, IllegalAccessException {
+  private AbfsOutputStreamContext populateAbfsOutputStreamContext(
+      int writeBufferSize,
+      boolean isFlushEnabled,
+      boolean disableOutputStreamFlush,
+      boolean isAppendBlob,
+      AbfsClient client, FileSystem.Statistics statistics, String path,

Review comment:
       one arg per line.

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
##########
@@ -695,12 +735,10 @@ public boolean hasLease() {
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder(super.toString());
-    if (outputStreamStatistics != null) {
-      sb.append("AbfsOutputStream@").append(this.hashCode());
-      sb.append("){");
-      sb.append(outputStreamStatistics.toString());
-      sb.append("}");
-    }
+    sb.append("AbfsOutputStream@").append(this.hashCode());

Review comment:
       null check removed? Hope this is done knowingly?

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
##########
@@ -230,27 +227,167 @@ public synchronized void write(final byte[] data, final 
int off, final int lengt
     if (hasLease() && isLeaseFreed()) {
       throw new PathIOException(path, ERR_WRITE_WITHOUT_LEASE);
     }
+    DataBlocks.DataBlock block = createBlockIfNeeded();
+    int written = block.write(data, off, length);
+    int remainingCapacity = block.remainingCapacity();
+
+    if (written < length) {
+      // Number of bytes to write is more than the data block capacity,
+      // trigger an upload and then write on the next block.
+      LOG.debug("writing more data than block has capacity -triggering 
upload");

Review comment:
       nit: remove 'has'

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
##########
@@ -230,27 +227,167 @@ public synchronized void write(final byte[] data, final 
int off, final int lengt
     if (hasLease() && isLeaseFreed()) {
       throw new PathIOException(path, ERR_WRITE_WITHOUT_LEASE);
     }
+    DataBlocks.DataBlock block = createBlockIfNeeded();
+    int written = block.write(data, off, length);
+    int remainingCapacity = block.remainingCapacity();
+
+    if (written < length) {
+      // Number of bytes to write is more than the data block capacity,
+      // trigger an upload and then write on the next block.
+      LOG.debug("writing more data than block has capacity -triggering 
upload");
+      uploadCurrentBlock();
+      // tail recursion is mildly expensive, but given buffer sizes must be MB.
+      // it's unlikely to recurse very deeply.
+      this.write(data, off + written, length - written);
+    } else {
+      if (remainingCapacity == 0) {
+        // the whole buffer is done, trigger an upload
+        uploadCurrentBlock();
+      }
+    }
+    incrementWriteOps();
+  }
 
-    int currentOffset = off;
-    int writableBytes = bufferSize - bufferIndex;
-    int numberOfBytesToWrite = length;
-
-    while (numberOfBytesToWrite > 0) {
-      if (writableBytes <= numberOfBytesToWrite) {
-        System.arraycopy(data, currentOffset, buffer, bufferIndex, 
writableBytes);
-        bufferIndex += writableBytes;
-        writeCurrentBufferToService();
-        currentOffset += writableBytes;
-        numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
-      } else {
-        System.arraycopy(data, currentOffset, buffer, bufferIndex, 
numberOfBytesToWrite);
-        bufferIndex += numberOfBytesToWrite;
-        numberOfBytesToWrite = 0;
+  /**
+   * Demand create a destination block.
+   *
+   * @return the active block; null if there isn't one.
+   * @throws IOException on any failure to create
+   */
+  private synchronized DataBlocks.DataBlock createBlockIfNeeded()
+      throws IOException {
+    if (activeBlock == null) {
+      blockCount++;
+      activeBlock = blockFactory
+          .create(blockCount, this.blockSize, outputStreamStatistics);
+    }
+    return activeBlock;
+  }
+
+  /**
+   * Start an asynchronous upload of the current block.
+   *
+   * @throws IOException Problems opening the destination for upload,
+   *                     initializing the upload, or if a previous operation 
has failed.
+   */
+  private synchronized void uploadCurrentBlock() throws IOException {
+    checkState(hasActiveBlock(), "No active block");
+    LOG.debug("Writing block # {}", blockCount);
+    try {
+      uploadBlockAsync(getActiveBlock(), false, false);
+    } finally {
+      // set the block to null, so the next write will create a new block.
+      clearActiveBlock();
+    }
+  }
+
+  /**
+   * Upload a block of data.
+   * This will take the block.
+   *
+   * @param blockToUpload block to upload
+   * @throws IOException     upload failure
+   * @throws PathIOException if too many blocks were written

Review comment:
       Why would too many blocks can cause PathIOE?

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
##########
@@ -219,6 +215,7 @@ public void write(final int byteVal) throws IOException {
   @Override
   public synchronized void write(final byte[] data, final int off, final int 
length)
       throws IOException {
+    DataBlocks.validateWriteArgs(data, off, length);
     maybeThrowLastError();
 
     Preconditions.checkArgument(data != null, "null data");

Review comment:
       Can this be combined with above validation?

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
##########
@@ -579,9 +641,7 @@ private synchronized void shrinkWriteOperationQueue() 
throws IOException {
         lastTotalAppendOffset += writeOperations.peek().length;
         writeOperations.remove();
         // Incrementing statistics to indicate queue has been shrunk.
-        if (outputStreamStatistics != null) {
-          outputStreamStatistics.queueShrunk();
-        }
+        outputStreamStatistics.queueShrunk();

Review comment:
       nit: refactor writeOperations.peek() in a sepreate variable such that it 
is called only once

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
##########
@@ -389,131 +527,54 @@ private synchronized void 
smallWriteOptimizedflushInternal(boolean isClose) thro
 
   private synchronized void flushInternalAsync() throws IOException {
     maybeThrowLastError();
-    writeCurrentBufferToService();
+    if (getActiveBlock() != null && getActiveBlock().hasData()) {
+      uploadCurrentBlock();
+    }
+    waitForAppendsToComplete();
     flushWrittenBytesToServiceAsync();
   }
 
   private void writeAppendBlobCurrentBufferToService() throws IOException {
-    if (bufferIndex == 0) {
+    DataBlocks.DataBlock activeBlock = getActiveBlock();
+    // No data, return.
+    if (activeBlock == null || !activeBlock.hasData()) {
       return;
     }
-    final byte[] bytes = buffer;
-    final int bytesLength = bufferIndex;
-    if (outputStreamStatistics != null) {
-      outputStreamStatistics.writeCurrentBuffer();
-      outputStreamStatistics.bytesToUpload(bytesLength);
-    }
-    buffer = byteBufferPool.getBuffer(false, bufferSize).array();
-    bufferIndex = 0;
+
+    final int bytesLength = activeBlock.dataSize();
+    DataBlocks.BlockUploadData uploadData = activeBlock.startUpload();
+    clearActiveBlock();
+    outputStreamStatistics.writeCurrentBuffer();
+    outputStreamStatistics.bytesToUpload(bytesLength);
     final long offset = position;
     position += bytesLength;
     AbfsPerfTracker tracker = client.getAbfsPerfTracker();
     try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
-            "writeCurrentBufferToService", "append")) {
+        "writeCurrentBufferToService", "append")) {
       AppendRequestParameters reqParams = new AppendRequestParameters(offset, 
0,
           bytesLength, APPEND_MODE, true, leaseId);
-      AbfsRestOperation op = client
-          .append(path, bytes, reqParams, cachedSasToken.get(),
-              new TracingContext(tracingContext));
+      AbfsRestOperation op = client.append(path, uploadData.toByteArray(), 
reqParams,
+          cachedSasToken.get(), new TracingContext(tracingContext));
       cachedSasToken.update(op.getSasToken());
-      if (outputStreamStatistics != null) {
-        outputStreamStatistics.uploadSuccessful(bytesLength);
-      }
+      outputStreamStatistics.uploadSuccessful(bytesLength);
+
       perfInfo.registerResult(op.getResult());
-      byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
       perfInfo.registerSuccess(true);
       return;
     } catch (Exception ex) {
-      if (ex instanceof AbfsRestOperationException) {
-        if (((AbfsRestOperationException) ex).getStatusCode() == 
HttpURLConnection.HTTP_NOT_FOUND) {
-          throw new FileNotFoundException(ex.getMessage());
-        }
-      }
-      if (ex instanceof AzureBlobFileSystemException) {
-        ex = (AzureBlobFileSystemException) ex;
-      }
-      lastError = new IOException(ex);
-      throw lastError;
-    }
-  }
-
-  private synchronized void writeCurrentBufferToService() throws IOException {
-    writeCurrentBufferToService(false, false);
-  }
-
-  private synchronized void writeCurrentBufferToService(boolean isFlush, 
boolean isClose) throws IOException {
-    if (this.isAppendBlob) {
-      writeAppendBlobCurrentBufferToService();
-      return;
-    }
-
-    if (bufferIndex == 0) {
-      return;
-    }
-    numOfAppendsToServerSinceLastFlush++;
-
-    final byte[] bytes = buffer;
-    final int bytesLength = bufferIndex;
-    if (outputStreamStatistics != null) {
-      outputStreamStatistics.writeCurrentBuffer();
-      outputStreamStatistics.bytesToUpload(bytesLength);
-    }
-    buffer = byteBufferPool.getBuffer(false, bufferSize).array();
-    bufferIndex = 0;
-    final long offset = position;
-    position += bytesLength;
-
-    if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) {
-      //Tracking time spent on waiting for task to complete.
-      if (outputStreamStatistics != null) {
-        try (DurationTracker ignored = 
outputStreamStatistics.timeSpentTaskWait()) {
-          waitForTaskToComplete();
-        }
-      } else {
-        waitForTaskToComplete();
-      }
-    }
-    final Future<Void> job = completionService.submit(() -> {
-      AbfsPerfTracker tracker = client.getAbfsPerfTracker();
-          try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
-              "writeCurrentBufferToService", "append")) {
-            AppendRequestParameters.Mode
-                mode = APPEND_MODE;
-            if (isFlush & isClose) {
-              mode = FLUSH_CLOSE_MODE;
-            } else if (isFlush) {
-              mode = FLUSH_MODE;
-            }
-            AppendRequestParameters reqParams = new AppendRequestParameters(
-                offset, 0, bytesLength, mode, false, leaseId);
-            AbfsRestOperation op = client.append(path, bytes, reqParams,
-                cachedSasToken.get(), new TracingContext(tracingContext));
-            cachedSasToken.update(op.getSasToken());
-            perfInfo.registerResult(op.getResult());
-            byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
-            perfInfo.registerSuccess(true);
-            return null;
-          }
-        });
-
-    if (outputStreamStatistics != null) {
-      if (job.isCancelled()) {
-        outputStreamStatistics.uploadFailed(bytesLength);
-      } else {
-        outputStreamStatistics.uploadSuccessful(bytesLength);
-      }
+      outputStreamStatistics.uploadFailed(bytesLength);
+      failureWhileSubmit(ex);
+    } finally {
+      uploadData.close();

Review comment:
       IOUtils.close() which has null check?

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
##########
@@ -389,131 +527,54 @@ private synchronized void 
smallWriteOptimizedflushInternal(boolean isClose) thro
 
   private synchronized void flushInternalAsync() throws IOException {
     maybeThrowLastError();
-    writeCurrentBufferToService();
+    if (getActiveBlock() != null && getActiveBlock().hasData()) {
+      uploadCurrentBlock();
+    }
+    waitForAppendsToComplete();
     flushWrittenBytesToServiceAsync();
   }
 
   private void writeAppendBlobCurrentBufferToService() throws IOException {
-    if (bufferIndex == 0) {
+    DataBlocks.DataBlock activeBlock = getActiveBlock();
+    // No data, return.
+    if (activeBlock == null || !activeBlock.hasData()) {

Review comment:
       reuse the refactored method as said by steve here as well.

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
##########
@@ -169,18 +175,24 @@
    */
   private Set<String> appendBlobDirSet;
 
-  public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
-                                  Configuration configuration,
-                                  AbfsCounters abfsCounters) throws 
IOException {
-    this.uri = uri;
+  /** BlockFactory being used by this instance.*/
+  private DataBlocks.BlockFactory blockFactory;
+  /** Number of active data blocks per AbfsOutputStream */
+  private int blockOutputActiveBlocks;
+  /** Bounded ThreadPool for this instance. */
+  private ExecutorService boundedThreadPool;
+
+  public AzureBlobFileSystemStore(
+      AzureBlobFileSystemStoreBuilder systemStoreBuilder) throws IOException {
+    this.uri = systemStoreBuilder.uri;

Review comment:
       nit : change to fsStoreBuilder?

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
##########
@@ -579,9 +641,7 @@ private synchronized void shrinkWriteOperationQueue() 
throws IOException {
         lastTotalAppendOffset += writeOperations.peek().length;
         writeOperations.remove();
         // Incrementing statistics to indicate queue has been shrunk.
-        if (outputStreamStatistics != null) {
-          outputStreamStatistics.queueShrunk();
-        }
+        outputStreamStatistics.queueShrunk();

Review comment:
       null check removed any specic reason?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to