[ 
https://issues.apache.org/jira/browse/HADOOP-17250?focusedWorklogId=614439&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-614439
 ]

ASF GitHub Bot logged work on HADOOP-17250:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Jun/21 10:34
            Start Date: 24/Jun/21 10:34
    Worklog Time Spent: 10m 
      Work Description: steveloughran commented on a change in pull request 
#3110:
URL: https://github.com/apache/hadoop/pull/3110#discussion_r657809804



##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
##########
@@ -68,6 +68,14 @@
   public static final String AZURE_READ_BUFFER_SIZE = 
"fs.azure.read.request.size";
   public static final String AZURE_READ_SMALL_FILES_COMPLETELY = 
"fs.azure.read.smallfilescompletely";
   public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = 
"fs.azure.read.optimizefooterread";
+
+  /**
+   * Read ahead range parameter which can be set by user.
+   * Default value is {@code 
FileSystemConfigurations#DEFAULT_READ_AHEAD_RANGE}.

Review comment:
       @link ? 

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -696,6 +713,11 @@ public boolean hasCapability(String capability) {
     return buffer;
   }
 
+  @VisibleForTesting
+  public int getReadAheadRange() {

Review comment:
       do we also want to include this in toString()?

##########
File path: 
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java
##########
@@ -47,6 +66,210 @@ protected Configuration createConfiguration() {
 
   @Override
   protected AbstractFSContract createContract(final Configuration conf) {
+    conf.setInt(AZURE_READ_AHEAD_RANGE, MIN_BUFFER_SIZE);
+    conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
     return new AbfsFileSystemContract(conf, isSecure);
   }
+
+  /**
+   * Test verifies if the data is read correctly
+   * when {@code ConfigurationKeys#AZURE_READ_AHEAD_RANGE} is set.
+   */
+  @Test
+  public void testSeekAndReadWithReadAhead() throws IOException {
+    describe(" Testing seek and read with read ahead " +
+            "enabled for random reads");
+
+    Path testSeekFile = path(getMethodName() + "bigseekfile.txt");
+    createDataSet(testSeekFile);
+    try (FSDataInputStream in = getFileSystem().open(testSeekFile)) {
+      AbfsInputStream inStream = ((AbfsInputStream) in.getWrappedStream());
+      AbfsInputStreamStatisticsImpl streamStatistics =
+              (AbfsInputStreamStatisticsImpl) inStream.getStreamStatistics();
+      assertEquals(String.format("Value of %s is not set correctly", 
AZURE_READ_AHEAD_RANGE),

Review comment:
       if you use Assertions.assertThat you can use .describedAs() as the 
string format

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -207,6 +215,22 @@ public synchronized int read(final byte[] b, final int 
off, final int len) throw
     }
     incrementReadOps();
     do {
+      if (nextReadPos >= fCursor - limit && nextReadPos <= fCursor) {

Review comment:
       can you make `fCursor-limit`  a variable with a clear name and use below 
too, e.g
   
   ```
   int bytesLeftInBuffer = fCursor -limit;
   ```

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
##########
@@ -878,6 +884,10 @@ public SASTokenProvider getSASTokenProvider() throws 
AzureBlobFileSystemExceptio
     }
   }
 
+  public int getReadAheadRange() {
+    return this.readAheadRange;

Review comment:
       ok

##########
File path: 
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java
##########
@@ -47,6 +66,210 @@ protected Configuration createConfiguration() {
 
   @Override
   protected AbstractFSContract createContract(final Configuration conf) {
+    conf.setInt(AZURE_READ_AHEAD_RANGE, MIN_BUFFER_SIZE);
+    conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
     return new AbfsFileSystemContract(conf, isSecure);
   }
+
+  /**
+   * Test verifies if the data is read correctly
+   * when {@code ConfigurationKeys#AZURE_READ_AHEAD_RANGE} is set.
+   */

Review comment:
       Can you add a comment explaining why this is big, i.e. you do want to 
create a complex sequence of backward and forward reads, which splitting up 
would not deliver. This is closer to a real world use

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -207,6 +215,22 @@ public synchronized int read(final byte[] b, final int 
off, final int len) throw
     }
     incrementReadOps();
     do {
+      if (nextReadPos >= fCursor - limit && nextReadPos <= fCursor) {
+        // data can be read from buffer.

Review comment:
       nit, add "some, possibly all the" and explain that this is determining 
the position of the buffer reads 

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
##########
@@ -68,6 +68,14 @@
   public static final String AZURE_READ_BUFFER_SIZE = 
"fs.azure.read.request.size";
   public static final String AZURE_READ_SMALL_FILES_COMPLETELY = 
"fs.azure.read.smallfilescompletely";
   public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = 
"fs.azure.read.optimizefooterread";
+
+  /**
+   * Read ahead range parameter which can be set by user.
+   * Default value is {@code 
FileSystemConfigurations#DEFAULT_READ_AHEAD_RANGE}.
+   * This might reduce number of calls to remote as next requested
+   * data could already be present in buffer.
+   */

Review comment:
       add a
   ```
   Value {@value}.
   ```
   (see fs.s3a.Constants for examples)

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -207,6 +215,22 @@ public synchronized int read(final byte[] b, final int 
off, final int len) throw
     }
     incrementReadOps();
     do {
+      if (nextReadPos >= fCursor - limit && nextReadPos <= fCursor) {
+        // data can be read from buffer.
+        bCursor = (int) (nextReadPos - (fCursor - limit));
+
+        // When bCursor == limit, buffer will be filled again.
+        // So in this case we are not actually reading from buffer.
+        if(bCursor != limit && streamStatistics != null) {

Review comment:
       nit add a space in if(

##########
File path: 
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java
##########
@@ -47,6 +66,210 @@ protected Configuration createConfiguration() {
 
   @Override
   protected AbstractFSContract createContract(final Configuration conf) {
+    conf.setInt(AZURE_READ_AHEAD_RANGE, MIN_BUFFER_SIZE);
+    conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
     return new AbfsFileSystemContract(conf, isSecure);
   }
+
+  /**
+   * Test verifies if the data is read correctly
+   * when {@code ConfigurationKeys#AZURE_READ_AHEAD_RANGE} is set.
+   */
+  @Test
+  public void testSeekAndReadWithReadAhead() throws IOException {
+    describe(" Testing seek and read with read ahead " +
+            "enabled for random reads");
+
+    Path testSeekFile = path(getMethodName() + "bigseekfile.txt");
+    createDataSet(testSeekFile);
+    try (FSDataInputStream in = getFileSystem().open(testSeekFile)) {
+      AbfsInputStream inStream = ((AbfsInputStream) in.getWrappedStream());
+      AbfsInputStreamStatisticsImpl streamStatistics =
+              (AbfsInputStreamStatisticsImpl) inStream.getStreamStatistics();
+      assertEquals(String.format("Value of %s is not set correctly", 
AZURE_READ_AHEAD_RANGE),
+              MIN_BUFFER_SIZE, inStream.getReadAheadRange());
+
+      long remoteReadOperationsOldVal = 
streamStatistics.getRemoteReadOperations();
+      assertEquals("Number of remote read ops should be 0 " +
+              "before any read call is made", 0, remoteReadOperationsOldVal);
+
+      // Test read at first position. Remote read.
+      assertEquals("First call to getPos() should return 0",
+              0, inStream.getPos());
+      assertDataAtPos(0,  (byte) in.read());
+      assertSeekBufferStats(0, streamStatistics.getSeekInBuffer());
+      long remoteReadOperationsNewVal = 
streamStatistics.getRemoteReadOperations();
+      assertIncrementInRemoteReadOps(remoteReadOperationsOldVal,
+              remoteReadOperationsNewVal);
+      remoteReadOperationsOldVal = remoteReadOperationsNewVal;
+
+      // Seeking just before read ahead range. Read from buffer.
+      int newSeek = inStream.getReadAheadRange() - 1;
+      in.seek(newSeek);
+      assertGetPosition(newSeek, in.getPos());
+      assertDataAtPos(newSeek, (byte) in.read());
+      assertSeekBufferStats(1, streamStatistics.getSeekInBuffer());
+      remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
+      assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal,
+              remoteReadOperationsNewVal);
+      remoteReadOperationsOldVal = remoteReadOperationsNewVal;
+
+      // Seeking boundary of read ahead range. Read from buffer manager.
+      newSeek = inStream.getReadAheadRange();
+      inStream.seek(newSeek);
+      assertGetPosition(newSeek, in.getPos());
+      assertDataAtPos(newSeek, (byte) in.read());
+      assertSeekBufferStats(1, streamStatistics.getSeekInBuffer());
+      remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
+      assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal,
+              remoteReadOperationsNewVal);
+      remoteReadOperationsOldVal = remoteReadOperationsNewVal;
+
+      // Seeking just after read ahead range. Read from buffer.
+      newSeek = inStream.getReadAheadRange() + 1;
+      in.seek(newSeek);
+      assertGetPosition(newSeek, in.getPos());
+      assertDataAtPos(newSeek, (byte) in.read());
+      assertSeekBufferStats(2, streamStatistics.getSeekInBuffer());
+      remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
+      assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal,
+              remoteReadOperationsNewVal);
+      remoteReadOperationsOldVal = remoteReadOperationsNewVal;
+
+      // Seeking just 10 more bytes such that data is read from buffer.
+      newSeek += 10;
+      in.seek(newSeek);
+      assertGetPosition(newSeek, in.getPos());
+      assertDataAtPos(newSeek, (byte) in.read());
+      assertSeekBufferStats(3, streamStatistics.getSeekInBuffer());
+      remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
+      assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal,
+              remoteReadOperationsNewVal);
+      remoteReadOperationsOldVal = remoteReadOperationsNewVal;
+
+      // Seek backward such that data is read from remote.
+      newSeek -= 100;
+      in.seek(newSeek);
+      assertGetPosition(newSeek, in.getPos());
+      assertDataAtPos(newSeek, (byte) in.read());
+      assertSeekBufferStats(3, streamStatistics.getSeekInBuffer());
+      remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
+      assertIncrementInRemoteReadOps(remoteReadOperationsOldVal,
+              remoteReadOperationsNewVal);
+      remoteReadOperationsOldVal = remoteReadOperationsNewVal;
+
+      // Seeking just 10 more bytes such that data is read from buffer.
+      newSeek += 10;
+      in.seek(newSeek);
+      assertGetPosition(newSeek, in.getPos());
+      assertDataAtPos(newSeek, (byte) in.read());
+      assertSeekBufferStats(4, streamStatistics.getSeekInBuffer());
+      remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
+      assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal,
+              remoteReadOperationsNewVal);
+      remoteReadOperationsOldVal = remoteReadOperationsNewVal;
+
+      // Read multiple bytes across read ahead range. Remote read.
+      long oldSeek = newSeek;
+      newSeek = 2*inStream.getReadAheadRange() -1;
+      byte[] bytes = new byte[5];
+      in.readFully(newSeek, bytes);
+      // With readFully getPos should return oldSeek pos.
+      // Adding one as one byte is already read
+      // after the last seek is done.
+      assertGetPosition(oldSeek + 1, in.getPos());
+      assertSeekBufferStats(4, streamStatistics.getSeekInBuffer());
+      assertDatasetEquals(newSeek, "Read across read ahead ",
+              bytes, bytes.length);
+      remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
+      assertIncrementInRemoteReadOps(remoteReadOperationsOldVal,
+              remoteReadOperationsNewVal);
+    }
+  }
+
+  /**
+   * Test to validate the getPos() when a seek is done
+   * post {@code AbfsInputStream#unbuffer} call is made.
+   * Also using optimised builder api to open file.
+   */
+  @Test
+  public void testSeekAfterUnbuffer() throws IOException {
+    describe("Test to make sure that seeking in AbfsInputStream after " +
+            "unbuffer() call is not doing anyIO.");
+    Path testFile = path(getMethodName() + ".txt");
+    createDataSet(testFile);
+    final CompletableFuture<FSDataInputStream> future =
+            getFileSystem().openFile(testFile)
+                    .build();
+    try (FSDataInputStream inputStream = awaitFuture(future)) {
+      AbfsInputStream abfsInputStream = (AbfsInputStream) 
inputStream.getWrappedStream();
+      AbfsInputStreamStatisticsImpl streamStatistics =
+              (AbfsInputStreamStatisticsImpl) 
abfsInputStream.getStreamStatistics();
+      int readAheadRange = abfsInputStream.getReadAheadRange();
+      long seekPos = readAheadRange;
+      inputStream.seek(seekPos);
+      assertDataAtPos(readAheadRange, (byte) inputStream.read());
+      long currentRemoteReadOps = streamStatistics.getRemoteReadOperations();
+      assertIncrementInRemoteReadOps(0, currentRemoteReadOps);
+      inputStream.unbuffer();
+      seekPos -= 10;
+      inputStream.seek(seekPos);
+      // Seek backwards shouldn't do any IO
+      assertNoIncrementInRemoteReadOps(currentRemoteReadOps, 
streamStatistics.getRemoteReadOperations());
+      assertGetPosition(seekPos, inputStream.getPos());
+    }
+  }
+
+  private void createDataSet(Path path) throws IOException {
+    createFile(getFileSystem(), path, true, BLOCK);
+  }
+
+  private void assertGetPosition(long expected, long actual) {
+    final String seekPosErrorMsg = "getPos() should return %s";
+    assertEquals(String.format(seekPosErrorMsg, expected), expected, actual);
+  }
+
+  private void assertDataAtPos(int pos, byte actualData) {
+    final String dataErrorMsg = "Mismatch in data@%s";
+    assertEquals(String.format(dataErrorMsg, pos), BLOCK[pos], actualData);
+  }
+
+  private void assertSeekBufferStats(long expected, long actual) {
+    final String statsErrorMsg = "Mismatch in seekInBuffer counts";
+    assertEquals(statsErrorMsg, expected, actual);
+  }
+
+  private void assertNoIncrementInRemoteReadOps(long oldVal, long newVal) {
+    final String incrementErrorMsg = "Number of remote read ops shouldn't 
increase";
+    assertEquals(incrementErrorMsg, oldVal, newVal);
+  }
+
+  private void assertIncrementInRemoteReadOps(long oldVal, long newVal) {
+    final String incrementErrorMsg = "Number of remote read ops should 
increase";
+    Assertions.assertThat(newVal)
+            .describedAs(incrementErrorMsg)
+            .isGreaterThan(oldVal);
+  }
+
+  /**
+   * Assert that the data read matches the dataset at the given offset.
+   * This helps verify that the seek process is moving the read pointer
+   * to the correct location in the file.
+   * @param readOffset the offset in the file where the read began.
+   * @param operation operation name for the assertion.
+   * @param data data read in.
+   * @param length length of data to check.
+   */
+  private void assertDatasetEquals(
+          final int readOffset,
+          final String operation,
+          final byte[] data,
+          int length) {
+    for (int i = 0; i < length; i++) {
+      int o = readOffset + i;
+      assertEquals(operation + "with read offset " + readOffset

Review comment:
       have a look to see if assertJ does this better, if not, leave as it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 614439)
    Time Spent: 3.5h  (was: 3h 20m)

> ABFS: Random read perf improvement
> ----------------------------------
>
>                 Key: HADOOP-17250
>                 URL: https://issues.apache.org/jira/browse/HADOOP-17250
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.3.0
>            Reporter: Sneha Vijayarajan
>            Assignee: Mukund Thakur
>            Priority: Major
>              Labels: abfsactive, pull-request-available
>          Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> Random read if marginally read ahead was seen to improve perf for a TPCH 
> query. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to