[ 
https://issues.apache.org/jira/browse/HADOOP-17195?focusedWorklogId=650705&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-650705
 ]

ASF GitHub Bot logged work on HADOOP-17195:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Sep/21 17:23
            Start Date: 14/Sep/21 17:23
    Worklog Time Spent: 10m 
      Work Description: steveloughran commented on a change in pull request 
#3406:
URL: https://github.com/apache/hadoop/pull/3406#discussion_r708375778



##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java
##########
@@ -0,0 +1,1064 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.store;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.DirectBufferPool;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Upload;
+import static 
org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
+public class DataBlocks {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DataBlocks.class);
+
+  private DataBlocks() {
+  }
+
+  /**
+   * Validate args to a write command. These are the same validation checks
+   * expected for any implementation of {@code OutputStream.write()}.
+   *
+   * @param b   byte array containing data.
+   * @param off offset in array where to start.
+   * @param len number of bytes to be written.
+   * @throws NullPointerException      for a null buffer
+   * @throws IndexOutOfBoundsException if indices are out of range
+   */
+  public static void validateWriteArgs(byte[] b, int off, int len)
+      throws IOException {
+    Preconditions.checkNotNull(b);
+    if ((off < 0) || (off > b.length) || (len < 0) ||
+        ((off + len) > b.length) || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException(
+          "write (b[" + b.length + "], " + off + ", " + len + ')');
+    }
+  }
+
+  /**
+   * Create a factory.
+   *
+   * @param KeyToBufferDir Key to buffer directory config for a FS.
+   * @param configuration  factory configurations.
+   * @param name           factory name -the option from {@link 
CommonConfigurationKeys}.
+   * @return the factory, ready to be initialized.
+   * @throws IllegalArgumentException if the name is unknown.
+   */
+  public static BlockFactory createFactory(String KeyToBufferDir,
+      Configuration configuration,
+      String name) {
+    LOG.debug("Creating DataFactory of type : {}", name);
+    switch (name) {
+    case CommonConfigurationKeys.DATA_BLOCKS_BUFFER_ARRAY:
+      return new ArrayBlockFactory(KeyToBufferDir, configuration);
+    case CommonConfigurationKeys.DATA_BLOCKS_BUFFER_DISK:
+      return new DiskBlockFactory(KeyToBufferDir, configuration);
+    case CommonConfigurationKeys.DATA_BLOCKS_BYTEBUFFER:
+      return new ByteBufferBlockFactory(KeyToBufferDir, configuration);
+    default:
+      throw new IllegalArgumentException("Unsupported block buffer" +
+          " \"" + name + '"');
+    }
+  }
+
+  /**
+   * The output information for an upload.
+   * It can be one of a file or an input stream.
+   * When closed, any stream is closed. Any source file is untouched.
+   */
+  public static final class BlockUploadData implements Closeable {
+    private final File file;
+    private InputStream uploadStream;
+    private byte[] byteArray;
+    private boolean isClosed;
+
+    public BlockUploadData(byte[] byteArray) {
+      this.file = null;
+      this.uploadStream = null;
+
+      this.byteArray = requireNonNull(byteArray);
+    }
+
+    /**
+     * File constructor; input stream will be null.
+     *
+     * @param file file to upload
+     */
+    BlockUploadData(File file) {
+      Preconditions.checkArgument(file.exists(), "No file: " + file);

Review comment:
       can you go to the `checkArgumnt(fileExists, "no file %s", file)`. 
sightly more efficient. The S3A version is old code...no need to replicate its 
age

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java
##########
@@ -0,0 +1,1064 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.store;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.DirectBufferPool;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Upload;
+import static 
org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
+public class DataBlocks {

Review comment:
       can you add a quick summary javadoc: 
   1. what does this do?
   1. Why is it needed (i.e to handle mismatch between data generation rate and 
upload speed, and avoid running out of memory in uploads)
   1. where did it come from/what is its history.
   
   plus: mark as final

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
##########
@@ -169,18 +175,24 @@
    */
   private Set<String> appendBlobDirSet;
 
-  public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
-                                  Configuration configuration,
-                                  AbfsCounters abfsCounters) throws 
IOException {
-    this.uri = uri;
+  /** BlockFactory being used by this instance.*/
+  private DataBlocks.BlockFactory blockFactory;
+  /** Number of active data blocks per AbfsOutputStream */
+  private int blockOutputActiveBlocks;
+  /** Bounded ThreadPool for this instance. */
+  private ExecutorService boundedThreadPool;
+
+  public AzureBlobFileSystemStore(

Review comment:
       add javadoc

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
##########
@@ -625,8 +644,11 @@ private AbfsRestOperation 
conditionalCreateOverwriteFile(final String relativePa
     return op;
   }
 
-  private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean 
isAppendBlob,
-      AbfsLease lease) {
+  private AbfsOutputStreamContext populateAbfsOutputStreamContext(

Review comment:
       1. Add javadoc
   2. split to one arg/line

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java
##########
@@ -0,0 +1,1064 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.store;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.DirectBufferPool;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Upload;
+import static 
org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
+public class DataBlocks {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DataBlocks.class);
+
+  private DataBlocks() {
+  }
+
+  /**
+   * Validate args to a write command. These are the same validation checks
+   * expected for any implementation of {@code OutputStream.write()}.
+   *
+   * @param b   byte array containing data.
+   * @param off offset in array where to start.
+   * @param len number of bytes to be written.
+   * @throws NullPointerException      for a null buffer
+   * @throws IndexOutOfBoundsException if indices are out of range
+   */
+  public static void validateWriteArgs(byte[] b, int off, int len)
+      throws IOException {
+    Preconditions.checkNotNull(b);
+    if ((off < 0) || (off > b.length) || (len < 0) ||
+        ((off + len) > b.length) || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException(
+          "write (b[" + b.length + "], " + off + ", " + len + ')');
+    }
+  }
+
+  /**
+   * Create a factory.
+   *
+   * @param KeyToBufferDir Key to buffer directory config for a FS.
+   * @param configuration  factory configurations.
+   * @param name           factory name -the option from {@link 
CommonConfigurationKeys}.
+   * @return the factory, ready to be initialized.
+   * @throws IllegalArgumentException if the name is unknown.
+   */
+  public static BlockFactory createFactory(String KeyToBufferDir,
+      Configuration configuration,
+      String name) {
+    LOG.debug("Creating DataFactory of type : {}", name);
+    switch (name) {
+    case CommonConfigurationKeys.DATA_BLOCKS_BUFFER_ARRAY:
+      return new ArrayBlockFactory(KeyToBufferDir, configuration);
+    case CommonConfigurationKeys.DATA_BLOCKS_BUFFER_DISK:
+      return new DiskBlockFactory(KeyToBufferDir, configuration);
+    case CommonConfigurationKeys.DATA_BLOCKS_BYTEBUFFER:
+      return new ByteBufferBlockFactory(KeyToBufferDir, configuration);
+    default:
+      throw new IllegalArgumentException("Unsupported block buffer" +
+          " \"" + name + '"');
+    }
+  }
+
+  /**
+   * The output information for an upload.
+   * It can be one of a file or an input stream.
+   * When closed, any stream is closed. Any source file is untouched.

Review comment:
       add "or after a call to {@link #toByteArray()}.

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
##########
@@ -464,4 +464,40 @@
   /** Default value for IOStatistics logging level. */
   public static final String IOSTATISTICS_LOGGING_LEVEL_DEFAULT
       = IOSTATISTICS_LOGGING_LEVEL_DEBUG;
+
+  /**

Review comment:
       issue: Should we make this an fs.data option, or should it be fs.azure 
specific?
   
   I actually think we should go with an fs.azure option, to go with the buffer 
dir setting.
   
   

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
##########
@@ -389,131 +527,54 @@ private synchronized void 
smallWriteOptimizedflushInternal(boolean isClose) thro
 
   private synchronized void flushInternalAsync() throws IOException {
     maybeThrowLastError();
-    writeCurrentBufferToService();
+    if (getActiveBlock() != null && getActiveBlock().hasData()) {

Review comment:
       this probe seems repeated about 4x, here, L521, L506 (I think), L538. 
Consider factoring out into a `hasActiveBlockDataToUpload()` predicate and 
reusing

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java
##########
@@ -0,0 +1,1064 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.store;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.DirectBufferPool;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Upload;
+import static 
org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
+public class DataBlocks {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DataBlocks.class);
+
+  private DataBlocks() {
+  }
+
+  /**
+   * Validate args to a write command. These are the same validation checks
+   * expected for any implementation of {@code OutputStream.write()}.
+   *
+   * @param b   byte array containing data.
+   * @param off offset in array where to start.
+   * @param len number of bytes to be written.
+   * @throws NullPointerException      for a null buffer
+   * @throws IndexOutOfBoundsException if indices are out of range
+   */
+  public static void validateWriteArgs(byte[] b, int off, int len)
+      throws IOException {
+    Preconditions.checkNotNull(b);
+    if ((off < 0) || (off > b.length) || (len < 0) ||
+        ((off + len) > b.length) || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException(
+          "write (b[" + b.length + "], " + off + ", " + len + ')');
+    }
+  }
+
+  /**
+   * Create a factory.
+   *
+   * @param KeyToBufferDir Key to buffer directory config for a FS.
+   * @param configuration  factory configurations.
+   * @param name           factory name -the option from {@link 
CommonConfigurationKeys}.
+   * @return the factory, ready to be initialized.
+   * @throws IllegalArgumentException if the name is unknown.
+   */
+  public static BlockFactory createFactory(String KeyToBufferDir,
+      Configuration configuration,
+      String name) {
+    LOG.debug("Creating DataFactory of type : {}", name);
+    switch (name) {
+    case CommonConfigurationKeys.DATA_BLOCKS_BUFFER_ARRAY:
+      return new ArrayBlockFactory(KeyToBufferDir, configuration);
+    case CommonConfigurationKeys.DATA_BLOCKS_BUFFER_DISK:
+      return new DiskBlockFactory(KeyToBufferDir, configuration);
+    case CommonConfigurationKeys.DATA_BLOCKS_BYTEBUFFER:
+      return new ByteBufferBlockFactory(KeyToBufferDir, configuration);
+    default:
+      throw new IllegalArgumentException("Unsupported block buffer" +
+          " \"" + name + '"');
+    }
+  }
+
+  /**
+   * The output information for an upload.
+   * It can be one of a file or an input stream.
+   * When closed, any stream is closed. Any source file is untouched.
+   */
+  public static final class BlockUploadData implements Closeable {
+    private final File file;
+    private InputStream uploadStream;
+    private byte[] byteArray;
+    private boolean isClosed;
+
+    public BlockUploadData(byte[] byteArray) {
+      this.file = null;
+      this.uploadStream = null;
+
+      this.byteArray = requireNonNull(byteArray);
+    }
+
+    /**
+     * File constructor; input stream will be null.
+     *
+     * @param file file to upload
+     */
+    BlockUploadData(File file) {
+      Preconditions.checkArgument(file.exists(), "No file: " + file);
+      this.file = file;
+      this.uploadStream = null;
+      this.byteArray = null;
+    }
+
+    /**
+     * Stream constructor, file field will be null.
+     *
+     * @param uploadStream stream to upload.
+     */
+    BlockUploadData(InputStream uploadStream) {
+      requireNonNull(uploadStream, "rawUploadStream");
+      this.uploadStream = uploadStream;
+      this.file = null;
+      this.byteArray = null;
+    }
+
+    /**
+     * Predicate: does this instance contain a file reference.
+     *
+     * @return true if there is a file.
+     */
+    boolean hasFile() {
+      return file != null;
+    }
+
+    /**
+     * Get the file, if there is one.
+     *
+     * @return the file for uploading, or null.
+     */
+    File getFile() {
+      return file;
+    }
+
+    /**
+     * Get the raw upload stream, if the object was
+     * created with one.
+     *
+     * @return the upload stream or null.
+     */
+    InputStream getUploadStream() {
+      return uploadStream;
+    }
+
+    /**
+     * A Method to return the uploadBlock into byteArray.

Review comment:
       Rewrite as 
   ```
   Convert to a byte array.
   If the data is stored in a file, it will be read and not cached
   If the data was passed in via an input stream (which happens if the data is 
stored
   in a bytebuffer) then it will be coverted to a byte array -which will then be
   cached for any subsequent use.
   ```

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
##########
@@ -389,131 +527,54 @@ private synchronized void 
smallWriteOptimizedflushInternal(boolean isClose) thro
 
   private synchronized void flushInternalAsync() throws IOException {
     maybeThrowLastError();
-    writeCurrentBufferToService();
+    if (getActiveBlock() != null && getActiveBlock().hasData()) {
+      uploadCurrentBlock();
+    }
+    waitForAppendsToComplete();
     flushWrittenBytesToServiceAsync();
   }
 
   private void writeAppendBlobCurrentBufferToService() throws IOException {

Review comment:
       can you add a quick javadoc here too, something about 
   - blocking upload of the current active block.
   - clearing active block
   - releasing all buffered data
   
   

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
##########
@@ -56,6 +56,21 @@
   public static final String AZURE_WRITE_MAX_CONCURRENT_REQUESTS = 
"fs.azure.write.max.concurrent.requests";
   public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = 
"fs.azure.write.max.requests.to.queue";
   public static final String AZURE_WRITE_BUFFER_SIZE = 
"fs.azure.write.request.size";
+  /**

Review comment:
       nit: add a blank line

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
##########
@@ -42,6 +42,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import org.apache.hadoop.fs.store.DataBlocks;

Review comment:
       this should go into the real org.apache import block

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java
##########
@@ -0,0 +1,1064 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.store;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.DirectBufferPool;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Upload;
+import static 
org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
+public class DataBlocks {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DataBlocks.class);
+
+  private DataBlocks() {
+  }
+
+  /**
+   * Validate args to a write command. These are the same validation checks
+   * expected for any implementation of {@code OutputStream.write()}.
+   *
+   * @param b   byte array containing data.
+   * @param off offset in array where to start.
+   * @param len number of bytes to be written.
+   * @throws NullPointerException      for a null buffer
+   * @throws IndexOutOfBoundsException if indices are out of range
+   */
+  public static void validateWriteArgs(byte[] b, int off, int len)
+      throws IOException {
+    Preconditions.checkNotNull(b);
+    if ((off < 0) || (off > b.length) || (len < 0) ||
+        ((off + len) > b.length) || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException(
+          "write (b[" + b.length + "], " + off + ", " + len + ')');
+    }
+  }
+
+  /**
+   * Create a factory.
+   *
+   * @param KeyToBufferDir Key to buffer directory config for a FS.
+   * @param configuration  factory configurations.
+   * @param name           factory name -the option from {@link 
CommonConfigurationKeys}.
+   * @return the factory, ready to be initialized.
+   * @throws IllegalArgumentException if the name is unknown.
+   */
+  public static BlockFactory createFactory(String KeyToBufferDir,
+      Configuration configuration,
+      String name) {
+    LOG.debug("Creating DataFactory of type : {}", name);
+    switch (name) {
+    case CommonConfigurationKeys.DATA_BLOCKS_BUFFER_ARRAY:
+      return new ArrayBlockFactory(KeyToBufferDir, configuration);
+    case CommonConfigurationKeys.DATA_BLOCKS_BUFFER_DISK:
+      return new DiskBlockFactory(KeyToBufferDir, configuration);
+    case CommonConfigurationKeys.DATA_BLOCKS_BYTEBUFFER:
+      return new ByteBufferBlockFactory(KeyToBufferDir, configuration);
+    default:
+      throw new IllegalArgumentException("Unsupported block buffer" +
+          " \"" + name + '"');
+    }
+  }
+
+  /**
+   * The output information for an upload.
+   * It can be one of a file or an input stream.
+   * When closed, any stream is closed. Any source file is untouched.
+   */
+  public static final class BlockUploadData implements Closeable {
+    private final File file;
+    private InputStream uploadStream;
+    private byte[] byteArray;
+    private boolean isClosed;
+
+    public BlockUploadData(byte[] byteArray) {

Review comment:
       nit: add javadoc

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java
##########
@@ -60,6 +61,9 @@
   private final AtomicLong writeCurrentBufferOps =
       
ioStatisticsStore.getCounterReference(StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS);
 
+  private final AtomicInteger blocksAllocated = new AtomicInteger(0);

Review comment:
       we aren't tracking these in IOStats. Thoughts? 
   * The blocksAllocated counter can go in
   * in theory, we could have a gauge of activeBlocks == allocated - released; 
not sure if it is worth the complexity

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
##########
@@ -233,6 +245,13 @@ public AzureBlobFileSystemStore(URI uri, boolean 
isSecureScheme,
       this.appendBlobDirSet = new HashSet<>(Arrays.asList(
           
abfsConfiguration.getAppendBlobDirs().split(AbfsHttpConstants.COMMA)));
     }
+    this.blockFactory = systemStoreBuilder.blockFactory;
+    this.blockOutputActiveBlocks = systemStoreBuilder.blockOutputActiveBlocks;
+    this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
+        abfsConfiguration.getWriteMaxConcurrentRequestCount(),
+        abfsConfiguration.getMaxWriteRequestsToQueue(),
+        10L, TimeUnit.SECONDS,

Review comment:
       what is the 10L here?

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
##########
@@ -125,6 +131,13 @@
   private TracingHeaderFormat tracingHeaderFormat;
   private Listener listener;
 
+  /** Name of blockFactory to be used by AbfsOutputStream. */
+  private String blockOutputBuffer;

Review comment:
       nit: blank lines before each javadoc

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
##########
@@ -56,6 +56,21 @@
   public static final String AZURE_WRITE_MAX_CONCURRENT_REQUESTS = 
"fs.azure.write.max.concurrent.requests";
   public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = 
"fs.azure.write.max.requests.to.queue";
   public static final String AZURE_WRITE_BUFFER_SIZE = 
"fs.azure.write.request.size";
+  /**
+   * Maximum Number of blocks a single output stream can have
+   * active (uploading, or queued to the central FileSystem
+   * instance's pool of queued operations.
+   * This stops a single stream overloading the shared thread pool.
+   * {@value}
+   * <p>
+   * Default is {@link 
FileSystemConfigurations#BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT}
+   */
+  public static final String FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS =
+      "fs.azure.block.upload.active.blocks";
+
+  /** Buffer directory path for uploading AbfsOutputStream data blocks. */
+  public static final String FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR = 
"fs.azure.buffer.dir";

Review comment:
       1. nit: Add a @value tag
   2. can we have a core-default.xml of ${hadoop.tmp.dir}/abfs ? Because having 
an isolated s3a one helped identify what was causing spark jobs to run out of 
temp disk place. The default of buffer option can go in there too

##########
File path: 
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
##########
@@ -149,9 +179,10 @@ public void verifyWriteRequest() throws Exception {
     when(client.append(anyString(), any(byte[].class), 
any(AppendRequestParameters.class), any(), 
any(TracingContext.class))).thenReturn(op);
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), 
any(), isNull(), any(TracingContext.class))).thenReturn(op);
 
-    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
-        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false),
-        tracingContext);
+    AbfsOutputStream out = new AbfsOutputStream(
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false, 
client,

Review comment:
       one arg/line again

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
##########
@@ -546,12 +569,8 @@ public OutputStream createFile(final Path path,
       AbfsLease lease = maybeCreateLease(relativePath, tracingContext);
 
       return new AbfsOutputStream(
-          client,
-          statistics,
-          relativePath,
-          0,
-          populateAbfsOutputStreamContext(isAppendBlob, lease),
-          tracingContext);
+          populateAbfsOutputStreamContext(isAppendBlob, lease, client,

Review comment:
       nit: can you still keep one argument per line, as before

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
##########
@@ -768,12 +799,8 @@ public OutputStream openFileForWrite(final Path path,
       AbfsLease lease = maybeCreateLease(relativePath, tracingContext);
 
       return new AbfsOutputStream(
-          client,
-          statistics,
-          relativePath,
-          offset,
-          populateAbfsOutputStreamContext(isAppendBlob, lease),
-          tracingContext);
+          populateAbfsOutputStreamContext(isAppendBlob, lease, client,

Review comment:
       same: one entry/line




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 650705)
    Time Spent: 2h  (was: 1h 50m)

> Intermittent OutOfMemory error while performing hdfs CopyFromLocal to abfs 
> ---------------------------------------------------------------------------
>
>                 Key: HADOOP-17195
>                 URL: https://issues.apache.org/jira/browse/HADOOP-17195
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: fs/azure
>    Affects Versions: 3.3.0
>            Reporter: Mehakmeet Singh
>            Assignee: Mehakmeet Singh
>            Priority: Major
>              Labels: abfsactive, pull-request-available
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> OutOfMemory error due to new ThreadPools being made each time 
> AbfsOutputStream is created. Since threadPool aren't limited a lot of data is 
> loaded in buffer and thus it causes OutOfMemory error.
> Possible fixes:
> - Limit the number of ThreadCounts while performing hdfs copyFromLocal (Using 
> -t property).
> - Reducing OUTPUT_BUFFER_SIZE significantly which would limit the amount of 
> buffer to be loaded in threads.
> - Don't create new ThreadPools each time AbfsOutputStream is created and 
> limit the number of ThreadPools each AbfsOutputStream could create.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to