[ 
https://issues.apache.org/jira/browse/HADOOP-18184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17754223#comment-17754223
 ] 

ASF GitHub Bot commented on HADOOP-18184:
-----------------------------------------

steveloughran commented on code in PR #5832:
URL: https://github.com/apache/hadoop/pull/5832#discussion_r1293843650


##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingCacheFiles.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.prefetch;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.Assumptions;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
+
+import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+import static org.apache.hadoop.test.Sizes.S_1K;
+
+/**
+ * Test the cache file behaviour with prefetching input stream.
+ */
+public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3APrefetchingCacheFiles.class);
+  private static final int BLOCK_SIZE = S_1K * 10;
+
+  private Path testFile;
+  private S3AFileSystem testFileSystem;
+  private int prefetchBlockSize;
+  private Configuration conf;
+
+  /**
+   * Thread level IOStatistics; reset in setup().
+   */
+  private IOStatisticsContext ioStatisticsContext;
+
+  private File tmpFileDir;
+
+
+  @Before
+  public void setUp() throws Exception {
+    super.setup();
+    conf = createConfiguration();
+    tmpFileDir = File.createTempFile("ITestS3APrefetchingCacheFiles", "");
+    tmpFileDir.delete();
+    tmpFileDir.mkdirs();
+
+    conf.set(BUFFER_DIR, tmpFileDir.getAbsolutePath());
+    String testFileUri = S3ATestUtils.getCSVTestFile(conf);
+
+    testFile = new Path(testFileUri);
+    testFileSystem = (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
+
+    prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, 
PREFETCH_BLOCK_DEFAULT_SIZE);
+    final FileStatus testFileStatus = testFileSystem.getFileStatus(testFile);
+    Assumptions.assumeThat(testFileStatus.getLen())
+        .describedAs("Test file %s is smaller than required size %d",
+            testFileStatus, prefetchBlockSize * 4)
+        .isGreaterThan(prefetchBlockSize);
+
+    ioStatisticsContext = getCurrentIOStatisticsContext();
+    ioStatisticsContext.reset();
+  }
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    enablePrefetch(conf, true);
+    disableFilesystemCaching(conf);
+    S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
+    conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    return conf;
+  }
+
+  @Override
+  public synchronized void teardown() throws Exception {
+    super.teardown();
+    tmpFileDir.delete();
+    File[] tmpFiles = tmpFileDir.listFiles();
+    if (tmpFiles != null) {
+      for (File filePath : tmpFiles) {
+        String path = filePath.getPath();
+        filePath.delete();
+      }
+    }
+    cleanupWithLogger(LOG, testFileSystem);
+  }
+
+  /**
+   * Test to verify the existence of the cache file.
+   * Tries to perform inputStream read and seek ops to make the prefetching 
take place and
+   * asserts whether file with .bin suffix is present. It also verifies 
certain file stats.
+   */
+  @Test
+  public void testCacheFileExistence() throws Throwable {
+    describe("Verify that FS cache files exist on local FS");
+    skipIfClientSideEncryption();
+
+    try (FSDataInputStream in = testFileSystem.open(testFile)) {
+      byte[] buffer = new byte[prefetchBlockSize];
+
+      in.read(buffer, 0, prefetchBlockSize - 10240);
+      assertCacheFileExists();

Review Comment:
   this turns out to be more complex than I'd thought, as there are some long 
standing behaviours in ensureCurrentBuffer which i suspect is broken. 





> s3a prefetching stream to support unbuffer()
> --------------------------------------------
>
>                 Key: HADOOP-18184
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18184
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.4.0
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Minor
>              Labels: pull-request-available
>
> Apache Impala uses unbuffer() to free up all client side resources held by a 
> stream, so allowing it to have a map of available (path -> stream) objects, 
> retained across queries.
> This saves on having to reopen the files, with the cost of HEAD checks etc. 
> S3AInputStream just closes its http connection. here there is a lot more 
> state to discard, but all memory and file storage must be freed.
> until this done, ITestS3AContractUnbuffer must skip when the prefetch stream 
> is used.
> its notable that the other tests don't fail, even though the stream doesn't 
> implement the interface; the graceful degradation handles that. it should 
> fail if the test xml resource says the stream does it, but that the stream 
> capabilities say it doesn't.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to