[ 
https://issues.apache.org/jira/browse/HADOOP-18231?focusedWorklogId=787052&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-787052
 ]

ASF GitHub Bot logged work on HADOOP-18231:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Jul/22 11:26
            Start Date: 01/Jul/22 11:26
    Worklog Time Spent: 10m 
      Work Description: dannycjones commented on code in PR #4386:
URL: https://github.com/apache/hadoop/pull/4386#discussion_r911877043


##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.net.URI;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
+/**
+ * Test the prefetching input stream, validates that the underlying 
S3CachingInputStream and
+ * S3InMemoryInputStream are working as expected.
+ */
+public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {
+
+  public ITestS3PrefetchingInputStream() {
+    super(true);
+  }
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3PrefetchingInputStream.class);
+
+  private static final int S_1K = 1024;
+  private static final int S_1M = S_1K * S_1K;
+  // Path for file which should have length > block size so 
S3CachingInputStream is used
+  private Path largeFile;
+  private FileSystem largeFileFS;
+  private int numBlocks;
+  private int blockSize;
+  private long largeFileSize;
+  // Size should be < block size so S3InMemoryInputStream is used
+  private static final int SMALL_FILE_SIZE = S_1K * 16;
+
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
+    conf.setBoolean(PREFETCH_ENABLED_KEY, true);
+    return conf;
+  }
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    cleanupWithLogger(LOG, largeFileFS);
+    largeFileFS = null;
+  }
+
+  private void openFS() throws Exception {
+    Configuration conf = getConfiguration();
+
+    largeFile = new Path(DEFAULT_CSVTEST_FILE);
+    blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, 
PREFETCH_BLOCK_DEFAULT_SIZE);
+    largeFileFS = new S3AFileSystem();
+    largeFileFS.initialize(new URI(DEFAULT_CSVTEST_FILE), getConfiguration());
+    FileStatus fileStatus = largeFileFS.getFileStatus(largeFile);
+    largeFileSize = fileStatus.getLen();
+    numBlocks = calculateNumBlocks(largeFileSize, blockSize);
+  }
+
+  private static int calculateNumBlocks(long largeFileSize, int blockSize) {
+    if (largeFileSize == 0) {
+      return 0;
+    } else {
+      return ((int) (largeFileSize / blockSize)) + (largeFileSize % blockSize 
> 0 ? 1 : 0);
+    }
+  }
+
+  @Test
+  public void testReadLargeFileFully() throws Throwable {
+    describe("read a large file fully, uses S3CachingInputStream");
+    openFS();
+
+    try (FSDataInputStream in = largeFileFS.open(largeFile)) {
+      IOStatistics ioStats = in.getIOStatistics();
+
+      byte[] buffer = new byte[S_1M * 10];
+      long bytesRead = 0;
+
+      while (bytesRead < largeFileSize) {
+        in.readFully(buffer, 0, (int) Math.min(buffer.length, largeFileSize - 
bytesRead));
+        bytesRead += buffer.length;
+      }
+
+      verifyStatisticCounterValue(ioStats, 
StoreStatisticNames.ACTION_HTTP_GET_REQUEST, numBlocks);
+      verifyStatisticCounterValue(ioStats, 
StreamStatisticNames.STREAM_READ_OPENED, numBlocks);
+    }
+  }
+
+  @Test
+  public void testRandomReadLargeFile() throws Throwable {
+    describe("random read on a large file, uses S3CachingInputStream");
+    openFS();
+
+    try (FSDataInputStream in = largeFileFS.open(largeFile)) {
+      IOStatistics ioStats = in.getIOStatistics();
+
+      byte[] buffer = new byte[blockSize];
+
+      // Don't read the block completely so it gets cached on seek
+      in.read(buffer, 0, blockSize - S_1K * 10);
+      in.seek(blockSize + S_1K * 10);
+      // Backwards seek, will use cached block
+      in.seek(S_1K * 5);
+      in.read();
+
+      verifyStatisticCounterValue(ioStats, 
StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 2);
+      verifyStatisticCounterValue(ioStats, 
StreamStatisticNames.STREAM_READ_OPENED, 2);
+    }
+  }
+
+  @Test
+  public void testRandomReadSmallFile() throws Throwable {
+    describe("random read on a small file, uses S3InMemoryInputStream");
+
+    byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'a', 26);
+    Path smallFile = path("randomReadSmallFile");
+    ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, 
data.length, 16, true);
+
+    try (FSDataInputStream in = getFileSystem().open(smallFile)) {
+      IOStatistics ioStats = in.getIOStatistics();
+
+      byte[] buffer = new byte[SMALL_FILE_SIZE];
+
+      in.read(buffer, 0, S_1K * 4);
+      in.seek(S_1K * 12);
+      in.read(buffer, 0, S_1K * 4);
+
+      verifyStatisticCounterValue(ioStats, 
StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 1);
+      verifyStatisticCounterValue(ioStats, 
StreamStatisticNames.STREAM_READ_OPENED, 1);
+    }
+  }
+
+}

Review Comment:
   Should I look into adding this as a CheckStyle rule? I can open a JIRA if so.
   
   Looks like there's one to fit this: 
https://checkstyle.sourceforge.io/config_misc.html#NewlineAtEndOfFile





Issue Time Tracking
-------------------

    Worklog Id:     (was: 787052)
    Time Spent: 5h 40m  (was: 5.5h)

> tests in ITestS3AInputStreamPerformance are failing 
> ----------------------------------------------------
>
>                 Key: HADOOP-18231
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18231
>             Project: Hadoop Common
>          Issue Type: Sub-task
>            Reporter: Ahmar Suhail
>            Assignee: Ahmar Suhail
>            Priority: Minor
>              Labels: pull-request-available
>          Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> The following tests are failing when prefetching is enabled:
> testRandomIORandomPolicy - expects stream to be opened 4 times (once for 
> every random read), but prefetching will only open twice. 
> testDecompressionSequential128K - expects stream to be opened once, but 
> prefetching will open once for each block the file has. landsat file used in 
> the test has size 42MB, prefetching block size = 8MB, expected open count is 
> 6.
>  testReadWithNormalPolicy - same as above. 
> testRandomIONormalPolicy - executes random IO, but with a normal policy. 
> S3AInputStream will abort the stream and change the policy, prefetching 
> handles random IO by caching blocks so doesn't do any of that. 
> testRandomReadOverBuffer - multiple assertions failing here, also depends a 
> lot on readAhead values, not very relevant for prefetching



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to