This is an automated email from the ASF dual-hosted git repository.

mthakur pushed a commit to branch feature-vectored-io
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 9843cc041ab4013ffc97f23152d5b015ce0f3532
Author: Mukund Thakur <[email protected]>
AuthorDate: Thu Jun 2 03:35:54 2022 +0530

    HADOOP-18107 Adding scale test for vectored reads for large file (#4273)
    
    * HADOOP-18107 Adding scale test for vectored reads for large file
    
    part of HADOOP-18103.
---
 .../contract/AbstractContractVectoredReadTest.java | 86 +++-------------------
 .../hadoop/fs/contract/ContractTestUtils.java      | 65 ++++++++++++++++
 .../org/apache/hadoop/fs/s3a/S3AInputStream.java   |  1 +
 .../org/apache/hadoop/fs/s3a/S3AReadOpContext.java |  1 -
 .../fs/s3a/impl/GetContentSummaryOperation.java    |  3 +-
 .../fs/s3a/scale/AbstractSTestS3AHugeFiles.java    | 33 +++++++++
 6 files changed, 112 insertions(+), 77 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
index 756c3de85cc..e8c86b5dbbc 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
@@ -43,7 +43,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.impl.FutureIOSupport;
 
+import static 
org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static 
org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
 
 @RunWith(Parameterized.class)
 public abstract class AbstractContractVectoredReadTest extends 
AbstractFSContractTestBase {
@@ -53,8 +55,6 @@ public abstract class AbstractContractVectoredReadTest 
extends AbstractFSContrac
   public static final int DATASET_LEN = 64 * 1024;
   private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 
'a', 32);
   protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
-  private static final String VECTORED_READ_FILE_1MB_NAME = 
"vectored_file_1M.txt";
-  private static final byte[] DATASET_MB = ContractTestUtils.dataset(1024 * 
1024, 'a', 256);
 
   private final IntFunction<ByteBuffer> allocate;
 
@@ -77,8 +77,6 @@ public abstract class AbstractContractVectoredReadTest 
extends AbstractFSContrac
     Path path = path(VECTORED_READ_FILE_NAME);
     FileSystem fs = getFileSystem();
     createFile(fs, path, true, DATASET);
-    Path bigFile = path(VECTORED_READ_FILE_1MB_NAME);
-    createFile(fs, bigFile, true, DATASET_MB);
   }
 
   @Test
@@ -99,7 +97,7 @@ public abstract class AbstractContractVectoredReadTest 
extends AbstractFSContrac
       CompletableFuture<Void> combinedFuture = 
CompletableFuture.allOf(completableFutures);
       combinedFuture.get();
 
-      validateVectoredReadResult(fileRanges);
+      validateVectoredReadResult(fileRanges, DATASET);
     }
   }
 
@@ -132,7 +130,7 @@ public abstract class AbstractContractVectoredReadTest 
extends AbstractFSContrac
     fileRanges.add(new FileRangeImpl(16 * 1024 + 101, 100));
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges, allocate);
-      validateVectoredReadResult(fileRanges);
+      validateVectoredReadResult(fileRanges, DATASET);
     }
   }
 
@@ -149,7 +147,7 @@ public abstract class AbstractContractVectoredReadTest 
extends AbstractFSContrac
     fileRanges.add(new FileRangeImpl(8*1024 - 101, 100));
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges, allocate);
-      validateVectoredReadResult(fileRanges);
+      validateVectoredReadResult(fileRanges, DATASET);
     }
   }
 
@@ -168,7 +166,7 @@ public abstract class AbstractContractVectoredReadTest 
extends AbstractFSContrac
     fileRanges.add(new FileRangeImpl(40*1024, 1024));
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges, allocate);
-      validateVectoredReadResult(fileRanges);
+      validateVectoredReadResult(fileRanges, DATASET);
     }
   }
 
@@ -184,24 +182,7 @@ public abstract class AbstractContractVectoredReadTest 
extends AbstractFSContrac
                     .build();
     try (FSDataInputStream in = builder.get()) {
       in.readVectored(fileRanges, allocate);
-      validateVectoredReadResult(fileRanges);
-    }
-  }
-
-  @Test
-  public void testVectoredRead1MBFile()  throws Exception {
-    FileSystem fs = getFileSystem();
-    List<FileRange> fileRanges = new ArrayList<>();
-    fileRanges.add(new FileRangeImpl(1293, 25837));
-    CompletableFuture<FSDataInputStream> builder =
-            fs.openFile(path(VECTORED_READ_FILE_1MB_NAME))
-            .build();
-    try (FSDataInputStream in = builder.get()) {
-      in.readVectored(fileRanges, allocate);
-      ByteBuffer vecRes = 
FutureIOSupport.awaitFuture(fileRanges.get(0).getData());
-      FileRange resRange = fileRanges.get(0);
-      assertDatasetEquals((int) resRange.getOffset(), "vecRead",
-              vecRes, resRange.getLength(), DATASET_MB);
+      validateVectoredReadResult(fileRanges, DATASET);
     }
   }
 
@@ -215,7 +196,7 @@ public abstract class AbstractContractVectoredReadTest 
extends AbstractFSContrac
     fileRanges.add(new FileRangeImpl(10, 980));
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges, allocate);
-      validateVectoredReadResult(fileRanges);
+      validateVectoredReadResult(fileRanges, DATASET);
     }
   }
 
@@ -272,7 +253,7 @@ public abstract class AbstractContractVectoredReadTest 
extends AbstractFSContrac
       Assertions.assertThat(in.getPos())
               .describedAs("Vectored read shouldn't change file pointer.")
               .isEqualTo(200);
-      validateVectoredReadResult(fileRanges);
+      validateVectoredReadResult(fileRanges, DATASET);
     }
   }
 
@@ -290,7 +271,7 @@ public abstract class AbstractContractVectoredReadTest 
extends AbstractFSContrac
               .describedAs("Vectored read shouldn't change file pointer.")
               .isEqualTo(200);
       in.readVectored(fileRanges, allocate);
-      validateVectoredReadResult(fileRanges);
+      validateVectoredReadResult(fileRanges, DATASET);
     }
   }
 
@@ -302,8 +283,8 @@ public abstract class AbstractContractVectoredReadTest 
extends AbstractFSContrac
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges1, allocate);
       in.readVectored(fileRanges2, allocate);
-      validateVectoredReadResult(fileRanges2);
-      validateVectoredReadResult(fileRanges1);
+      validateVectoredReadResult(fileRanges2, DATASET);
+      validateVectoredReadResult(fileRanges1, DATASET);
     }
   }
 
@@ -314,27 +295,6 @@ public abstract class AbstractContractVectoredReadTest 
extends AbstractFSContrac
     return fileRanges;
   }
 
-  protected void validateVectoredReadResult(List<FileRange> fileRanges)
-          throws ExecutionException, InterruptedException {
-    CompletableFuture<?>[] completableFutures = new 
CompletableFuture<?>[fileRanges.size()];
-    int i = 0;
-    for (FileRange res : fileRanges) {
-      completableFutures[i++] = res.getData();
-    }
-    CompletableFuture<Void> combinedFuture = 
CompletableFuture.allOf(completableFutures);
-    combinedFuture.get();
-
-    for (FileRange res : fileRanges) {
-      CompletableFuture<ByteBuffer> data = res.getData();
-      try {
-        ByteBuffer buffer = FutureIOSupport.awaitFuture(data);
-        assertDatasetEquals((int) res.getOffset(), "vecRead", buffer, 
res.getLength(), DATASET);
-      } catch (Exception ex) {
-        LOG.error("Exception while running vectored read ", ex);
-        Assert.fail("Exception while running vectored read " + ex);
-      }
-    }
-  }
 
   protected void testExceptionalVectoredRead(FileSystem fs,
                                              List<FileRange> fileRanges,
@@ -351,26 +311,4 @@ public abstract class AbstractContractVectoredReadTest 
extends AbstractFSContrac
             .describedAs(s)
             .isTrue();
   }
-
-  /**
-   * Assert that the data read matches the dataset at the given offset.
-   * This helps verify that the seek process is moving the read pointer
-   * to the correct location in the file.
-   *  @param readOffset the offset in the file where the read began.
-   * @param operation  operation name for the assertion.
-   * @param data       data read in.
-   * @param length     length of data to check.
-   * @param originalData
-   */
-  private void assertDatasetEquals(
-          final int readOffset, final String operation,
-          final ByteBuffer data,
-          int length, byte[] originalData) {
-    for (int i = 0; i < length; i++) {
-      int o = readOffset + i;
-      assertEquals(operation + " with read offset " + readOffset
-                      + ": data[" + i + "] != DATASET[" + o + "]",
-              originalData[o], data.get());
-    }
-  }
 }
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
index eb56d957d9a..a316b02fb08 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.contract;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileRange;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -29,6 +30,8 @@ import org.apache.hadoop.fs.PathCapabilities;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.functional.FutureIO;
+
 import org.junit.Assert;
 import org.junit.AssumptionViolatedException;
 import org.slf4j.Logger;
@@ -39,6 +42,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -49,6 +53,9 @@ import java.util.NoSuchElementException;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
@@ -68,6 +75,11 @@ public class ContractTestUtils extends Assert {
   public static final String IO_CHUNK_MODULUS_SIZE = "io.chunk.modulus.size";
   public static final int DEFAULT_IO_CHUNK_MODULUS_SIZE = 128;
 
+  /**
+   * Timeout in seconds for vectored read operation in tests : {@value}.
+   */
+  public static final int VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS = 5 * 
60;
+
   /**
    * Assert that a property in the property set matches the expected value.
    * @param props property set
@@ -1095,6 +1107,59 @@ public class ContractTestUtils extends Assert {
                 mismatch);
   }
 
+  /**
+   * Utility to validate vectored read results.
+   * @param fileRanges input ranges.
+   * @param originalData original data.
+   * @throws IOException any ioe.
+   */
+  public static void validateVectoredReadResult(List<FileRange> fileRanges,
+                                                byte[] originalData)
+          throws IOException, TimeoutException {
+    CompletableFuture<?>[] completableFutures = new 
CompletableFuture<?>[fileRanges.size()];
+    int i = 0;
+    for (FileRange res : fileRanges) {
+      completableFutures[i++] = res.getData();
+    }
+    CompletableFuture<Void> combinedFuture = 
CompletableFuture.allOf(completableFutures);
+    FutureIO.awaitFuture(combinedFuture,
+            VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
+            TimeUnit.SECONDS);
+
+    for (FileRange res : fileRanges) {
+      CompletableFuture<ByteBuffer> data = res.getData();
+      ByteBuffer buffer = FutureIO.awaitFuture(data,
+              VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
+              TimeUnit.SECONDS);
+      assertDatasetEquals((int) res.getOffset(), "vecRead",
+              buffer, res.getLength(), originalData);
+    }
+  }
+
+
+  /**
+   * Assert that the data read matches the dataset at the given offset.
+   * This helps verify that the seek process is moving the read pointer
+   * to the correct location in the file.
+   *  @param readOffset the offset in the file where the read began.
+   * @param operation  operation name for the assertion.
+   * @param data       data read in.
+   * @param length     length of data to check.
+   * @param originalData original data.
+   */
+  public static void assertDatasetEquals(
+          final int readOffset,
+          final String operation,
+          final ByteBuffer data,
+          int length, byte[] originalData) {
+    for (int i = 0; i < length; i++) {
+      int o = readOffset + i;
+      assertEquals(operation + " with read offset " + readOffset
+                      + ": data[" + i + "] != DATASET[" + o + "]",
+              originalData[o], data.get());
+    }
+  }
+
   /**
    * Receives test data from the given input file and checks the size of the
    * data as well as the pattern inside the received data.
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 23f31df1645..9d87f26c3c0 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -186,6 +186,7 @@ public class S3AInputStream extends FSInputStream 
implements  CanSetReadahead,
    * @param ctx operation context
    * @param s3Attributes object attributes
    * @param client S3 client to use
+   * @param streamStatistics stream io stats.
    * @param unboundedThreadPool thread pool to use.
    */
   public S3AInputStream(S3AReadOpContext ctx,
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
index 29e3df1af12..803b7757d25 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
@@ -153,7 +153,6 @@ public class S3AReadOpContext extends S3AOpContext {
   }
 
   /**
-<<<<<<< HEAD
    * Set builder value.
    * @param value new value
    * @return the builder
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/GetContentSummaryOperation.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/GetContentSummaryOperation.java
index 248bffb9401..257cef8192b 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/GetContentSummaryOperation.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/GetContentSummaryOperation.java
@@ -220,8 +220,7 @@ public class GetContentSummaryOperation extends
 
     /***
      * List all entries under a path.
-     *
-     * @param path
+     * @param path path.
      * @param recursive if the subdirectories need to be traversed recursively
      * @return an iterator over the listing.
      * @throws IOException failure
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
index 15700ce9535..956e23a3f11 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
@@ -19,8 +19,13 @@
 package org.apache.hadoop.fs.s3a.scale;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.IntFunction;
 
 import com.amazonaws.event.ProgressEvent;
 import com.amazonaws.event.ProgressEventType;
@@ -35,7 +40,10 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.FileRangeImpl;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.Constants;
@@ -47,6 +55,7 @@ import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.util.Progressable;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static 
org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
 import static 
org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING;
@@ -446,6 +455,30 @@ public abstract class AbstractSTestS3AHugeFiles extends 
S3AScaleTestBase {
         toHuman(timer.nanosPerOperation(ops)));
   }
 
+  @Test
+  public void test_045_vectoredIOHugeFile() throws Throwable {
+    assumeHugeFileExists();
+    List<FileRange> rangeList = new ArrayList<>();
+    rangeList.add(new FileRangeImpl(5856368, 1167716));
+    rangeList.add(new FileRangeImpl(3520861, 1167700));
+    rangeList.add(new FileRangeImpl(8191913, 1167775));
+    rangeList.add(new FileRangeImpl(1520861, 1167700));
+    rangeList.add(new FileRangeImpl(2520861, 116770));
+    rangeList.add(new FileRangeImpl(9191913, 116770));
+    rangeList.add(new FileRangeImpl(2820861, 156770));
+    IntFunction<ByteBuffer> allocate = ByteBuffer::allocate;
+    FileSystem fs = getFileSystem();
+    CompletableFuture<FSDataInputStream> builder =
+            fs.openFile(hugefile).build();
+    try (FSDataInputStream in = builder.get()) {
+      in.readVectored(rangeList, allocate);
+      byte[] readFullRes = new byte[(int)filesize];
+      in.readFully(0, readFullRes);
+      // Comparing vectored read results with read fully.
+      validateVectoredReadResult(rangeList, readFullRes);
+    }
+  }
+
   /**
    * Read in the entire file using read() calls.
    * @throws Throwable failure


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to